This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 530bc4b23 [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514) 530bc4b23 is described below commit 530bc4b23e06a219a31c9ecea3cefad921c01db4 Author: healchow <healc...@gmail.com> AuthorDate: Mon Nov 14 11:31:28 2022 +0800 [INLONG-5024][Manager] Improve the delete operation of InlongGroup (#6514) --- .../inlong/manager/common/enums/ErrorCodeEnum.java | 16 +- .../service/group/InlongGroupProcessService.java | 32 +-- .../manager/service/group/InlongGroupService.java | 75 +++--- .../service/group/InlongGroupServiceImpl.java | 285 +++++++++++---------- .../group/UpdateGroupCompleteListener.java | 10 +- .../listener/group/UpdateGroupFailedListener.java | 1 + .../service/stream/InlongStreamService.java | 23 +- .../service/stream/InlongStreamServiceImpl.java | 22 +- .../web/controller/InlongGroupController.java | 76 +++--- 9 files changed, 282 insertions(+), 258 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 87a6e706c..f62e9e920 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -39,17 +39,17 @@ public enum ErrorCodeEnum { GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"), GROUP_DUPLICATE(1002, "Inlong group already exists"), GROUP_INFO_INCORRECT(1003, "Group info was incorrect"), - GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group"), - GROUP_PERMISSION_DENIED(1004, "No permission to access this inlong group"), - GROUP_HAS_STREAM(1005, "There are some valid inlong stream for this inlong group"), + GROUP_SAVE_FAILED(1004, "Failed to save/update inlong group"), + GROUP_PERMISSION_DENIED(1005, "No permission to access this inlong group"), GROUP_UPDATE_NOT_ALLOWED(1006, "The current inlong group status does not support modification"), GROUP_DELETE_NOT_ALLOWED(1007, "The current inlong group status does not support deletion"), - GROUP_ID_UPDATE_NOT_ALLOWED(1008, "The current inlong group status does not support modifying the group id"), - GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011, - "The current inlong group status does not support modifying the MQ type"), + GROUP_DELETE_HAS_STREAM(1008, "The inlong group contains inlong streams and is not allowed to be deleted"), + + GROUP_ID_UPDATE_NOT_ALLOWED(1010, "The current status does not support modifying the inlong group id"), + GROUP_MIDDLEWARE_UPDATE_NOT_ALLOWED(1011, "The current status does not support modifying the MQ type"), GROUP_NAME_UPDATE_NOT_ALLOWED(1012, "The current inlong group status does not support modifying the name"), GROUP_INFO_INCONSISTENT(1013, "The inlong group info is inconsistent, please contact the administrator"), - GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight, standard"), + GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support lightweight or standard"), OPT_NOT_ALLOWED_BY_STATUS(1021, "InlongGroup status %s was not allowed to add/update/delete related info"), @@ -74,7 +74,7 @@ public enum ErrorCodeEnum { STREAM_EXT_SAVE_FAILED(1207, "Failed to save/update inlong stream extension information"), STREAM_FIELD_SAVE_FAILED(1208, "Failed to save/update inlong stream field"), STREAM_DELETE_HAS_SOURCE(1209, "The inlong stream contains source info and is not allowed to be deleted"), - STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains data sink info and is not allowed to be deleted"), + STREAM_DELETE_HAS_SINK(1210, "The inlong stream contains sink info and is not allowed to be deleted"), SOURCE_TYPE_IS_NULL(1300, "Source type is null"), SOURCE_TYPE_NOT_SUPPORT(1301, "Source type '%s' not support"), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index de044f0aa..0109cdec1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -185,36 +185,34 @@ public class InlongGroupProcessService { * @return inlong group id */ public String deleteProcessAsync(String groupId, String operator) { - LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator); + LOGGER.info("begin to delete group asynchronously for groupId={} by user={}", groupId, operator); EXECUTOR_SERVICE.execute(() -> { try { invokeDeleteProcess(groupId, operator); - } catch (Exception ex) { - LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex); - throw ex; + } catch (Exception e) { + LOGGER.error(String.format("failed to async delete group for groupId=%s by %s", groupId, operator), e); + throw e; } - groupService.delete(groupId, operator); }); - LOGGER.info("success to delete process asynchronously for groupId={} by operator={}", groupId, operator); + LOGGER.info("success to delete group asynchronously for groupId={} by user={}", groupId, operator); return groupId; } /** - * Delete InlongGroup logically and delete related resource in an asynchronous way. + * Delete InlongGroup logically and delete related resource in a synchronous way. */ - public boolean deleteProcess(String groupId, String operator) { - LOGGER.info("begin to delete process for groupId={} by operator={}", groupId, operator); + public Boolean deleteProcess(String groupId, String operator) { + LOGGER.info("begin to delete group for groupId={} by user={}", groupId, operator); try { invokeDeleteProcess(groupId, operator); - } catch (Exception ex) { - LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex); - throw ex; + } catch (Exception e) { + LOGGER.error(String.format("failed to delete group for groupId=%s by user=%s", groupId, operator), e); + throw e; } - boolean result = groupService.delete(groupId, operator); - LOGGER.info("success to delete process for groupId={} by operator={}", groupId, operator); - return result; + LOGGER.info("success to delete group for groupId={} by user={}", groupId, operator); + return true; } /** @@ -287,7 +285,9 @@ public class InlongGroupProcessService { } private void invokeDeleteProcess(String groupId, String operator) { - InlongGroupInfo groupInfo = groupService.get(groupId); + // check can be deleted + InlongGroupInfo groupInfo = groupService.doDeleteCheck(groupId, operator); + // start to delete group process GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.DELETE); workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index 3c07faa28..58acbfb27 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -45,6 +45,14 @@ public interface InlongGroupService { */ String save(InlongGroupRequest groupInfo, String operator); + /** + * Query whether the specified group id exists + * + * @param groupId the group id to be queried + * @return does it exist + */ + Boolean exist(String groupId); + /** * Get inlong group info based on inlong group id * @@ -53,6 +61,31 @@ public interface InlongGroupService { */ InlongGroupInfo get(String groupId); + /** + * Query the group information of each status of the current user + * + * @param operator name of operator + * @return inlong group status statistics + */ + InlongGroupCountResponse countGroupByUser(String operator); + + /** + * According to the group id, query the topic to which it belongs + * + * @param groupId Inlong group id + * @return Topic information + * @apiNote TubeMQ corresponds to the group, only 1 topic + */ + InlongGroupTopicInfo getTopic(String groupId); + + /** + * According to the group id, query the backup topic to which it belongs + * + * @param groupId inlong group id + * @return backup topic info + */ + InlongGroupTopicInfo getBackupTopic(String groupId); + /** * Paging query inlong group brief info list * @@ -79,48 +112,26 @@ public interface InlongGroupService { * @param operator name of operator * @return whether succeed */ - boolean updateStatus(String groupId, Integer status, String operator); + Boolean updateStatus(String groupId, Integer status, String operator); /** - * Delete the group information of the specified group id + * Check whether deletion is supported for the specified group. * - * @param groupId The group id that needs to be deleted + * @param groupId inlong group id * @param operator name of operator - * @return whether succeed - */ - boolean delete(String groupId, String operator); - - /** - * Query whether the specified group id exists - * - * @param groupId the group id to be queried - * @return does it exist + * @return inlong group info */ - Boolean exist(String groupId); + InlongGroupInfo doDeleteCheck(String groupId, String operator); /** - * Query the group information of each status of the current user + * Delete the group information of the specified group id * + * @param groupId The group id that needs to be deleted * @param operator name of operator - * @return inlong group status statistics - */ - InlongGroupCountResponse countGroupByUser(String operator); - - /** - * According to the group id, query the topic to which it belongs - * - * @param groupId inlong group id - * @return topic info - */ - InlongGroupTopicInfo getTopic(String groupId); - - /** - * According to the group id, query the backup topic to which it belongs - * - * @param groupId inlong group id - * @return backup topic info + * @return whether succeed + * @apiNote Before invoking this delete method, you must */ - InlongGroupTopicInfo getBackupTopic(String groupId); + Boolean delete(String groupId, String operator); /** * Save the group modified when the approval is passed diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 6a5839065..9c143fd67 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -30,7 +30,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; @@ -118,7 +117,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { * @param request request of updated * @param operator current operator */ - private static void checkGroupCanUpdate(InlongGroupEntity entity, InlongGroupRequest request, String operator) { + private static void doUpdateCheck(InlongGroupEntity entity, InlongGroupRequest request, String operator) { if (entity == null || request == null) { return; } @@ -146,8 +145,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { } } - @Transactional(rollbackFor = Throwable.class) @Override + @Transactional(rollbackFor = Throwable.class) public String save(InlongGroupRequest request, String operator) { LOGGER.debug("begin to save inlong group={} by user={}", request, operator); Preconditions.checkNotNull(request, "inlong group request cannot be empty"); @@ -158,6 +157,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { LOGGER.error("groupId {} has already exists", groupId); throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE); } + request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK); InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType()); groupId = instance.saveOpt(request, operator); @@ -169,6 +169,14 @@ public class InlongGroupServiceImpl implements InlongGroupService { return groupId; } + @Override + public Boolean exist(String groupId) { + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null); + return entity != null; + } + @Override public InlongGroupInfo get(String groupId) { Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); @@ -192,6 +200,75 @@ public class InlongGroupServiceImpl implements InlongGroupService { return groupInfo; } + @Override + public InlongGroupCountResponse countGroupByUser(String operator) { + InlongGroupCountResponse countVO = new InlongGroupCountResponse(); + List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator); + for (Map<String, Object> map : statusCount) { + int status = (Integer) map.get("status"); + long count = (Long) map.get("count"); + countVO.setTotalCount(countVO.getTotalCount() + count); + if (status == GroupStatus.CONFIG_ING.getCode()) { + countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count); + } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) { + countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count); + } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) { + countVO.setRejectCount(countVO.getRejectCount() + count); + } + } + + LOGGER.debug("success to count inlong group for operator={}", operator); + return countVO; + } + + @Override + public InlongGroupTopicInfo getTopic(String groupId) { + // the group info will not null in get() method + InlongGroupInfo groupInfo = this.get(groupId); + InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType()); + InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo); + + // set the base params + topicInfo.setInlongGroupId(groupId); + String clusterTag = groupInfo.getInlongClusterTag(); + topicInfo.setInlongClusterTag(clusterTag); + + // assert: each MQ type has a corresponding type of cluster + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType()); + topicInfo.setClusterInfos(clusterInfos); + + LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo); + return topicInfo; + } + + @Override + public InlongGroupTopicInfo getBackupTopic(String groupId) { + // backup topic info saved in the ext table + InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG); + if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) { + LOGGER.warn("not found any backup topic for groupId={}", groupId); + return null; + } + + // the group info will not null in get() method + InlongGroupInfo groupInfo = this.get(groupId); + InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType()); + InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo); + + // set the base params + backupTopicInfo.setInlongGroupId(groupId); + String backupClusterTag = extEntity.getKeyValue(); + backupTopicInfo.setInlongClusterTag(backupClusterTag); + + // set backup cluster info + // assert: each MQ type has a corresponding type of cluster + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType()); + backupTopicInfo.setClusterInfos(clusterInfos); + + LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo); + return backupTopicInfo; + } + @Override public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) { if (request.getPageSize() > MAX_PAGE_SIZE) { @@ -231,7 +308,8 @@ public class InlongGroupServiceImpl implements InlongGroupService { } @Override - @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, + @Transactional(rollbackFor = Throwable.class, + isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW) public String update(InlongGroupRequest request, String operator) { LOGGER.debug("begin to update inlong group={} by user={}", request, operator); @@ -248,7 +326,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } // check whether the current status can be modified - checkGroupCanUpdate(entity, request, operator); + doUpdateCheck(entity, request, operator); request.setEnableZookeeper(enableZookeeper ? InlongConstants.ENABLE_ZK : InlongConstants.DISABLE_ZK); InlongGroupOperator instance = groupOperatorFactory.getInstance(request.getMqType()); @@ -264,7 +342,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW) - public boolean updateStatus(String groupId, Integer status, String operator) { + public Boolean updateStatus(String groupId, Integer status, String operator) { LOGGER.info("begin to update group status to [{}] for groupId={} by user={}", status, groupId, operator); Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); InlongGroupEntity entity = groupMapper.selectByGroupIdForUpdate(groupId); @@ -287,137 +365,6 @@ public class InlongGroupServiceImpl implements InlongGroupService { return true; } - @Override - @Transactional(rollbackFor = Throwable.class) - public boolean delete(String groupId, String operator) { - LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator); - Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); - - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - if (entity == null) { - LOGGER.error("inlong group not found by groupId={}", groupId); - throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); - } - - // Determine whether the current status can be deleted - GroupStatus curState = GroupStatus.forCode(entity.getStatus()); - if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETED)) { - String errMsg = String.format("Current status=%s was not allowed to delete", curState); - LOGGER.error(errMsg); - throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg); - } - - /* - If the status allowed logic delete, all associated data can be logically deleted. - In other status, you need to delete the related "inlong stream" first. - When deleting a related inlong stream, you also need to check whether - there are some related "stream source" and "stream sink" - */ - if (GroupStatus.allowedLogicDelete(curState)) { - streamService.logicDeleteAll(entity.getInlongGroupId(), operator); - } else { - int count = streamService.selectCountByGroupId(groupId); - if (count >= 1) { - LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count); - throw new BusinessException(ErrorCodeEnum.GROUP_HAS_STREAM); - } - } - - // update the group after deleting related info - entity.setIsDeleted(entity.getId()); - entity.setStatus(GroupStatus.DELETED.getCode()); - entity.setModifier(operator); - int rowCount = groupMapper.updateByIdentifierSelective(entity); - if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { - LOGGER.error("inlong group has already updated with group id={}, curVersion={}", - entity.getInlongGroupId(), entity.getVersion()); - throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); - } - - // logically delete the associated extension info - groupExtMapper.logicDeleteAllByGroupId(groupId); - - LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator); - return true; - } - - @Override - public Boolean exist(String groupId) { - Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); - InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); - LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null); - return entity != null; - } - - @Override - public InlongGroupCountResponse countGroupByUser(String operator) { - InlongGroupCountResponse countVO = new InlongGroupCountResponse(); - List<Map<String, Object>> statusCount = groupMapper.countGroupByUser(operator); - for (Map<String, Object> map : statusCount) { - int status = (Integer) map.get("status"); - long count = (Long) map.get("count"); - countVO.setTotalCount(countVO.getTotalCount() + count); - if (status == GroupStatus.CONFIG_ING.getCode()) { - countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count); - } else if (status == GroupStatus.TO_BE_APPROVAL.getCode()) { - countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count); - } else if (status == GroupStatus.APPROVE_REJECTED.getCode()) { - countVO.setRejectCount(countVO.getRejectCount() + count); - } - } - - LOGGER.debug("success to count inlong group for operator={}", operator); - return countVO; - } - - @Override - public InlongGroupTopicInfo getTopic(String groupId) { - // the group info will not null in get() method - InlongGroupInfo groupInfo = this.get(groupId); - InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType()); - InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo); - - // set the base params - topicInfo.setInlongGroupId(groupId); - String clusterTag = groupInfo.getInlongClusterTag(); - topicInfo.setInlongClusterTag(clusterTag); - - // assert: each MQ type has a corresponding type of cluster - List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, groupInfo.getMqType()); - topicInfo.setClusterInfos(clusterInfos); - - LOGGER.debug("success to get topic for groupId={}, result={}", groupId, topicInfo); - return topicInfo; - } - - @Override - public InlongGroupTopicInfo getBackupTopic(String groupId) { - // backup topic info saved in the ext table - InlongGroupExtEntity extEntity = groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG); - if (extEntity == null || StringUtils.isBlank(extEntity.getKeyValue())) { - LOGGER.warn("not found any backup topic for groupId={}", groupId); - return null; - } - - // the group info will not null in get() method - InlongGroupInfo groupInfo = this.get(groupId); - InlongGroupOperator groupOperator = groupOperatorFactory.getInstance(groupInfo.getMqType()); - InlongGroupTopicInfo backupTopicInfo = groupOperator.getBackupTopic(groupInfo); - - // set the base params - backupTopicInfo.setInlongGroupId(groupId); - String backupClusterTag = extEntity.getKeyValue(); - backupTopicInfo.setInlongClusterTag(backupClusterTag); - - // set backup cluster info - // assert: each MQ type has a corresponding type of cluster - List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType()); - backupTopicInfo.setClusterInfos(clusterInfos); - - LOGGER.debug("success to get backup topic for groupId={}, result={}", groupId, backupTopicInfo); - return backupTopicInfo; - } - @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW) public void updateAfterApprove(InlongGroupApproveRequest approveRequest, String operator) { @@ -427,10 +374,10 @@ public class InlongGroupServiceImpl implements InlongGroupService { // only the [TO_BE_APPROVAL] status allowed the passing operation InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); if (entity == null) { - throw new WorkflowListenerException("inlong group not found with group id=" + groupId); + throw new BusinessException("inlong group not found with group id=" + groupId); } if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) { - throw new WorkflowListenerException("inlong group status is [wait_approval], not allowed to approve again"); + throw new BusinessException("inlong group status [wait_approval] not allowed to approve again"); } // bind cluster tag and update status to [GROUP_APPROVE_PASSED] @@ -468,6 +415,64 @@ public class InlongGroupServiceImpl implements InlongGroupService { LOGGER.info("success to save or update inlong group ext for groupId={}", groupId); } + @Override + public InlongGroupInfo doDeleteCheck(String groupId, String operator) { + InlongGroupInfo groupInfo = this.get(groupId); + // only the person in charges can update + List<String> inCharges = Arrays.asList(groupInfo.getInCharges().split(InlongConstants.COMMA)); + if (!inCharges.contains(operator)) { + LOGGER.error("user [{}] has no privilege for the inlong group", operator); + throw new BusinessException(ErrorCodeEnum.GROUP_PERMISSION_DENIED); + } + + // determine whether the current status can be deleted + GroupStatus curState = GroupStatus.forCode(groupInfo.getStatus()); + if (GroupStatus.notAllowedTransition(curState, GroupStatus.DELETING)) { + String errMsg = String.format("current group status=%s was not allowed to delete", curState); + LOGGER.error(errMsg); + throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg); + } + + // If the status allowed logic delete, all associated info will be logically deleted. + // In other status, you need to delete the related "inlong_stream" first. + if (!GroupStatus.allowedLogicDelete(curState)) { + int count = streamService.selectCountByGroupId(groupId); + if (count >= 1) { + LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count); + throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_HAS_STREAM); + } + } + + return groupInfo; + } + + @Override + @Transactional(rollbackFor = Throwable.class) + public Boolean delete(String groupId, String operator) { + LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator); + InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); + Preconditions.checkNotNull(entity, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage()); + + entity.setIsDeleted(entity.getId()); + entity.setStatus(GroupStatus.DELETED.getCode()); + entity.setModifier(operator); + int rowCount = groupMapper.updateByIdentifierSelective(entity); + if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error("inlong group has already updated for groupId={} curVersion={}", groupId, entity.getVersion()); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + + // logically delete the associated extension info + groupExtMapper.logicDeleteAllByGroupId(groupId); + + if (GroupStatus.allowedLogicDelete(GroupStatus.forCode(entity.getStatus()))) { + streamService.logicDeleteAll(groupId, operator); + } + + LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator); + return true; + } + private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) { Map<String, String> extMap = new HashMap<>(); extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue())); @@ -482,7 +487,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { case USER_DEFINED: return createUserDefinedSortConfig(extMap); default: - LOGGER.warn("Unsupported sort config for sortType:{}", sortType); + LOGGER.warn("unsupported sort config for sortType: {}", sortType); return null; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index 8a4116100..4c76c62bf 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.source.StreamSourceService; @@ -58,23 +59,26 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { log.info("begin to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType); // update inlong group status and other configs + InlongGroupInfo groupInfo = form.getGroupInfo(); + InlongGroupRequest groupRequest = groupInfo.genRequest(); String operator = context.getOperator(); switch (operateType) { case SUSPEND: groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), operator); + groupService.update(groupRequest, operator); break; case RESTART: groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), operator); + groupService.update(groupRequest, operator); break; case DELETE: - groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), operator); + // delete process completed, then delete the group info + groupService.delete(groupId, operator); break; default: log.warn("unsupported operate={} for inlong group", operateType); break; } - InlongGroupInfo groupInfo = form.getGroupInfo(); - groupService.update(groupInfo.genRequest(), operator); // if the inlong group is lightweight mode, the stream source needs to be processed. if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java index 2741c50d4..39f02657c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupFailedListener.java @@ -51,6 +51,7 @@ public class UpdateGroupFailedListener implements ProcessEventListener { // update inlong group status and other info String operator = context.getOperator(); + // delete process failed, then change the group status to [CONFIG_FAILED] groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator); groupService.update(form.getGroupInfo().genRequest(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index bdbdef4d4..3d17b3972 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -43,6 +43,15 @@ public interface InlongStreamService { */ Integer save(InlongStreamRequest request, String operator); + /** + * Query whether the inlong stream ID exists + * + * @param groupId inlong group id + * @param streamId inlong stream id + * @return true: exists, false: does not exist + */ + Boolean exist(String groupId, String streamId); + /** * Query the details of the specified inlong stream * @@ -60,15 +69,6 @@ public interface InlongStreamService { */ List<InlongStreamInfo> list(String groupId); - /** - * Query whether the inlong stream ID exists - * - * @param groupId inlong group id - * @param streamId inlong stream id - * @return true: exists, false: does not exist - */ - Boolean exist(String groupId, String streamId); - /** * Paging query inlong stream brief info list * @@ -103,7 +103,10 @@ public interface InlongStreamService { Boolean update(InlongStreamRequest request, String operator); /** - * Delete the specified inlong stream + * Delete the specified inlong stream. + * <p/> + * When deleting an inlong stream, you need to check whether there are some related + * stream_sources or stream_sinks * * @param groupId Inlong group id * @param streamId Inlong stream id diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 633d46fca..043df26df 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -126,6 +126,14 @@ public class InlongStreamServiceImpl implements InlongStreamService { return streamEntity.getId(); } + @Override + public Boolean exist(String groupId, String streamId) { + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); + InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); + return streamEntity != null; + } + @Override public InlongStreamInfo get(String groupId, String streamId) { LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId); @@ -183,14 +191,6 @@ public class InlongStreamServiceImpl implements InlongStreamService { return streamList; } - @Override - public Boolean exist(String groupId, String streamId) { - Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); - Preconditions.checkNotNull(groupId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); - InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); - return streamEntity != null; - } - /** * Query and set the extended information and data source fields of the inlong stream */ @@ -307,7 +307,7 @@ public class InlongStreamServiceImpl implements InlongStreamService { throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } // Check whether the current inlong group status supports modification - this.checkCanUpdate(inlongGroupEntity.getStatus(), streamEntity, request); + this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request); CommonBeanUtils.copyProperties(request, streamEntity, true); streamEntity.setModifier(operator); @@ -326,8 +326,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { return true; } - @Transactional(rollbackFor = Throwable.class) @Override + @Transactional(rollbackFor = Throwable.class) public Boolean delete(String groupId, String streamId, String operator) { LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId); Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); @@ -580,7 +580,7 @@ public class InlongStreamServiceImpl implements InlongStreamService { * @param streamEntity Original inlong stream entity * @param request New inlong stream information */ - private void checkCanUpdate(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) { + private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) { if (streamEntity == null || request == null) { return; } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 959d005e4..e3375618c 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -68,6 +68,13 @@ public class InlongGroupController { return Response.success(groupService.save(groupRequest, operator)); } + @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET) + @ApiOperation(value = "Is the inlong group id exists") + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) + public Response<Boolean> exist(@PathVariable String groupId) { + return Response.success(groupService.exist(groupId)); + } + @RequestMapping(value = "/group/get/{groupId}", method = RequestMethod.GET) @ApiOperation(value = "Get inlong group") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) @@ -75,6 +82,25 @@ public class InlongGroupController { return Response.success(groupService.get(groupId)); } + @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET) + @ApiOperation(value = "Count inlong group status for current user") + public Response<InlongGroupCountResponse> countGroupByUser() { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupService.countGroupByUser(operator)); + } + + @GetMapping(value = "/group/getTopic/{groupId}") + @ApiOperation(value = "Get topic info") + public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) { + return Response.success(groupService.getTopic(groupId)); + } + + @GetMapping(value = "/group/getBackupTopic/{groupId}") + @ApiOperation(value = "Get backup topic info") + public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) { + return Response.success(groupService.getBackupTopic(groupId)); + } + @RequestMapping(value = "/group/list", method = RequestMethod.POST) @ApiOperation(value = "List inlong groups by paginating") public Response<PageResult<InlongGroupBriefInfo>> listBrief(@RequestBody InlongGroupPageRequest request) { @@ -91,30 +117,22 @@ public class InlongGroupController { return Response.success(groupService.update(groupRequest, operator)); } - @RequestMapping(value = "/group/exist/{groupId}", method = RequestMethod.GET) - @ApiOperation(value = "Is the inlong group id exists") + @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE) + @ApiOperation(value = "Delete inlong group info") + @OperationLog(operation = OperationType.DELETE) @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) - public Response<Boolean> exist(@PathVariable String groupId) { - return Response.success(groupService.exist(groupId)); - } - - @RequestMapping(value = "/group/countByStatus", method = RequestMethod.GET) - @ApiOperation(value = "Count inlong group status for current user") - public Response<InlongGroupCountResponse> countGroupByUser() { + public Response<Boolean> delete(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); - return Response.success(groupService.countGroupByUser(operator)); - } - - @GetMapping(value = "/group/getTopic/{groupId}") - @ApiOperation(value = "Get topic info") - public Response<InlongGroupTopicInfo> getTopic(@PathVariable String groupId) { - return Response.success(groupService.getTopic(groupId)); + return Response.success(groupProcessOperation.deleteProcess(groupId, operator)); } - @GetMapping(value = "/group/getBackupTopic/{groupId}") - @ApiOperation(value = "Get backup topic info") - public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String groupId) { - return Response.success(groupService.getBackupTopic(groupId)); + @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE) + @ApiOperation(value = "Delete inlong group info") + @OperationLog(operation = OperationType.DELETE) + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) + public Response<String> deleteAsync(@PathVariable String groupId) { + String operator = LoginUserUtils.getLoginUser().getName(); + return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator)); } @RequestMapping(value = "/group/startProcess/{groupId}", method = RequestMethod.POST) @@ -141,15 +159,6 @@ public class InlongGroupController { return Response.success(groupProcessOperation.restartProcess(groupId, operator)); } - @RequestMapping(value = "/group/delete/{groupId}", method = RequestMethod.DELETE) - @ApiOperation(value = "Delete inlong group info") - @OperationLog(operation = OperationType.DELETE) - @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) - public Response<Boolean> delete(@PathVariable String groupId) { - String operator = LoginUserUtils.getLoginUser().getName(); - return Response.success(groupProcessOperation.deleteProcess(groupId, operator)); - } - @RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = RequestMethod.POST) @ApiOperation(value = "Suspend inlong group process") @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class) @@ -166,15 +175,6 @@ public class InlongGroupController { return Response.success(groupProcessOperation.restartProcessAsync(groupId, operator)); } - @RequestMapping(value = "/group/deleteAsync/{groupId}", method = RequestMethod.DELETE) - @ApiOperation(value = "Delete inlong group info") - @OperationLog(operation = OperationType.DELETE) - @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) - public Response<String> deleteAsync(@PathVariable String groupId) { - String operator = LoginUserUtils.getLoginUser().getName(); - return Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator)); - } - @PostMapping(value = "/group/reset") @ApiOperation(value = "Reset group status when group is in CONFIG_ING|SUSPENDING|RESTARTING|DELETING") public Response<Boolean> reset(@RequestBody @Validated InlongGroupResetRequest request) {