This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 649d09508 [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886) 649d09508 is described below commit 649d0950805799efd8922ec41e999e0b08cd6235 Author: woofyzhao <490467...@qq.com> AuthorDate: Wed Sep 14 15:31:55 2022 +0800 [INLONG-5885][Manager] Refactor the task issue logic to simply code complexity (#5886) Co-authored-by: healchow <healc...@gmail.com> --- .../service/core/impl/AgentServiceImpl.java | 102 +++++++++++---------- .../inlong/manager/service/ServiceBaseTest.java | 1 + .../service/core/impl/AgentServiceTest.java | 68 ++++++++++++++ 3 files changed, 123 insertions(+), 48 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 2258581a9..111035c87 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -97,7 +97,7 @@ public class AgentServiceImpl implements AgentService { // Update task status, other tasks with status 20x will change to 30x in next request if (CollectionUtils.isEmpty(request.getCommandInfo())) { - LOGGER.warn("task result was empty, just return"); + LOGGER.info("task result was empty in request: {}, just return", request); return; } for (CommandEntity command : request.getCommandInfo()) { @@ -211,61 +211,17 @@ public class AgentServiceImpl implements AgentService { } final String agentIp = taskRequest.getAgentIp(); final String agentClusterName = taskRequest.getClusterName(); - final String uuid = taskRequest.getUuid(); Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName), "both agent ip and cluster name are blank when fetching file task"); List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList, Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName); List<DataConfig> fileTasks = Lists.newArrayList(); for (StreamSourceEntity sourceEntity : sourceEntities) { - FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams()); - final String destIp = sourceEntity.getAgentIp(); - final String destClusterName = sourceEntity.getInlongClusterName(); - - // Use ip directly if it is not empty - if (StringUtils.isNotBlank(destIp)) { - if (destIp.equals(agentIp)) { - int op = getOp(sourceEntity.getStatus()); - int nextStatus = getNextStatus(sourceEntity.getStatus()); - sourceEntity.setUuid(uuid); - sourceEntity.setStatus(nextStatus); - if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) { - sourceEntity.setVersion(sourceEntity.getVersion() + 1); - fileTasks.add(getDataConfig(sourceEntity, op)); - } - } + DataConfig taskConfig = getFileTaskFromEntity(taskRequest, sourceEntity); + if (taskConfig == null) { continue; } - - // Cluster name is not blank, split subtask if necessary - // The template task's id is assigned to the subtask's template id field - if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName) - && Objects.isNull(sourceEntity.getTemplateId())) { - - // Is the task already fetched by this agent ? - List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId()); - if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) { - continue; - } - - // If not, clone a sub task for the new agent - // Note that a new source name with random suffix is generated to adhere to the unique constraint - StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); - FileSourceDTO childFileSourceDTO = CommonBeanUtils.copyProperties(fileSourceDTO, FileSourceDTO::new); - fileEntity.setExtParams(JsonUtils.toJsonString(childFileSourceDTO)); - fileEntity.setAgentIp(agentIp); - fileEntity.setUuid(uuid); - fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10)); - fileEntity.setTemplateId(sourceEntity.getId()); - int op = getOp(fileEntity.getStatus()); - int nextStatus = getNextStatus(fileEntity.getStatus()); - fileEntity.setStatus(nextStatus); - if (sourceMapper.insert(fileEntity) > 0) { - // refresh entity version and others. - fileEntity = sourceMapper.selectById(fileEntity.getId()); - fileTasks.add(getDataConfig(fileEntity, op)); - } - } + fileTasks.add(taskConfig); if (fileTasks.size() >= TASK_FETCH_SIZE) { break; } @@ -273,6 +229,56 @@ public class AgentServiceImpl implements AgentService { return fileTasks; } + private DataConfig getFileTaskFromEntity(TaskRequest taskRequest, StreamSourceEntity sourceEntity) { + final String agentIp = taskRequest.getAgentIp(); + final String uuid = taskRequest.getUuid(); + + // fetch task by designated agent ip + final String destIp = sourceEntity.getAgentIp(); + if (StringUtils.isNotBlank(destIp) && destIp.equals(agentIp)) { + int op = getOp(sourceEntity.getStatus()); + int nextStatus = getNextStatus(sourceEntity.getStatus()); + sourceEntity.setUuid(uuid); + sourceEntity.setStatus(nextStatus); + if (sourceMapper.updateByPrimaryKeySelective(sourceEntity) == 1) { + sourceEntity.setVersion(sourceEntity.getVersion() + 1); + return getDataConfig(sourceEntity, op); + } + } + + // fetch task by cluster name and template source + String destClusterName = sourceEntity.getInlongClusterName(); + boolean isTemplateTask = sourceEntity.getTemplateId() == null + && StringUtils.isNotBlank(destClusterName) + && destClusterName.equals(taskRequest.getClusterName()); + if (isTemplateTask) { + // is the task already fetched by this agent ? + List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId()); + if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) { + return null; + } + + // if not, clone a subtask for this Agent. + // note: a new source name with random suffix is generated to adhere to the unique constraint + StreamSourceEntity fileEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); + fileEntity.setExtParams(sourceEntity.getExtParams()); + fileEntity.setAgentIp(agentIp); + fileEntity.setUuid(uuid); + fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10)); + fileEntity.setTemplateId(sourceEntity.getId()); + int nextStatus = getNextStatus(fileEntity.getStatus()); + fileEntity.setStatus(nextStatus); + + // create new sub source task + sourceMapper.insert(fileEntity); + + // select again to refresh entity version and others. + return getDataConfig(sourceMapper.selectById(fileEntity.getId()), getOp(fileEntity.getStatus())); + } + + return null; + } + private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) { final String agentIp = taskRequest.getAgentIp(); final String uuid = taskRequest.getUuid(); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java index b6f7a5507..0407c8bcc 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java @@ -50,6 +50,7 @@ public class ServiceBaseTest extends BaseTest { public static final String GLOBAL_GROUP_ID = "global_group"; public static final String GLOBAL_STREAM_ID = "global_stream"; public static final String GLOBAL_OPERATOR = "admin"; + public static final String GLOBAL_CLUSTER_NAME = "global_cluster"; private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class); @Autowired diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index 0ddbe6791..d50acf694 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -17,9 +17,19 @@ package org.apache.inlong.manager.service.core.impl; +import com.google.common.collect.Lists; +import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.db.CommandEntity; +import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.pojo.agent.DataConfig; +import org.apache.inlong.common.pojo.agent.TaskRequest; +import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.SourceStatus; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.file.FileSourceRequest; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest; import org.apache.inlong.manager.service.ServiceBaseTest; import org.apache.inlong.manager.service.core.AgentService; @@ -56,6 +66,20 @@ class AgentServiceTest extends ServiceBaseTest { return sourceService.save(sourceInfo, GLOBAL_OPERATOR); } + /** + * Save template source + */ + public Integer saveTemplateSource() { + streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR); + FileSourceRequest sourceInfo = new FileSourceRequest(); + sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID); + sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID); + sourceInfo.setSourceType(SourceType.FILE); + sourceInfo.setSourceName("template_source_in_agent_service_test"); + sourceInfo.setInlongClusterName(GLOBAL_CLUSTER_NAME); + return sourceService.save(sourceInfo, GLOBAL_OPERATOR); + } + /** * Test report snapshot. */ @@ -78,4 +102,48 @@ class AgentServiceTest extends ServiceBaseTest { sourceService.delete(id, GLOBAL_OPERATOR); } + /** + * Test sub-source task status manipulation. + */ + @Test + void testGetAndReportSubSourceTask() { + // create template source for cluster agents and approve + final Integer templateId = this.saveTemplateSource(); + sourceService.updateStatus(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, SourceStatus.TO_BE_ISSUED_ADD.getCode(), + GLOBAL_OPERATOR); + + // get sub-source task + TaskRequest getRequest = new TaskRequest(); + getRequest.setAgentIp("127.0.0.1"); + getRequest.setClusterName(GLOBAL_CLUSTER_NAME); + getRequest.setPullJobType(PullJobTypeEnum.NEW.getType()); + TaskResult result = agentService.getTaskResult(getRequest); + Assertions.assertEquals(1, result.getDataConfigs().size()); + DataConfig subSourceTask = result.getDataConfigs().get(0); + // new sub-source version must be 1 + Assertions.assertEquals(1, subSourceTask.getVersion()); + // sub-source's id must be different from its template source + Assertions.assertNotEquals(templateId, subSourceTask.getTaskId()); + // operation is to add new task + Assertions.assertEquals(SourceStatus.BEEN_ISSUED_ADD.getCode() % 100, + Integer.valueOf(subSourceTask.getOp())); + + // report sub-source status + CommandEntity reportTask = new CommandEntity(); + reportTask.setTaskId(subSourceTask.getTaskId()); + reportTask.setVersion(subSourceTask.getVersion()); + reportTask.setCommandResult(Constants.RESULT_SUCCESS); + TaskRequest reportRequest = new TaskRequest(); + reportRequest.setAgentIp("127.0.0.1"); + reportRequest.setCommandInfo(Lists.newArrayList(reportTask)); + agentService.report(reportRequest); + + // check sub-source task status + StreamSource subSource = sourceService.get(subSourceTask.getTaskId()); + Assertions.assertEquals(SourceStatus.SOURCE_NORMAL.getCode(), subSource.getStatus()); + + sourceService.delete(templateId, GLOBAL_OPERATOR); + sourceService.delete(subSource.getId(), GLOBAL_OPERATOR); + } + }