This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e64cbd96c [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547) e64cbd96c is described below commit e64cbd96cf5a8771c3f617531230e26f33508e10 Author: healchow <healc...@gmail.com> AuthorDate: Tue Nov 15 20:20:48 2022 +0800 [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547) --- .../listener/group/InitGroupCompleteListener.java | 46 ++++++------ .../stream/InitStreamCompleteListener.java | 21 +++--- .../resource/sort/DefaultSortConfigOperator.java | 21 ------ .../service/stream/InlongStreamService.java | 11 ++- .../service/stream/InlongStreamServiceImpl.java | 87 +++++----------------- 5 files changed, 67 insertions(+), 119 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index a16562fd8..bd848f996 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -72,30 +72,34 @@ public class InitGroupCompleteListener implements ProcessEventListener { String groupId = form.getInlongGroupId(); log.info("begin to execute InitGroupCompleteListener for groupId={}", groupId); - // update inlong group status and other info - InlongGroupInfo groupInfo = form.getGroupInfo(); - String operator = context.getOperator(); - groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator); - if (InlongGroupUtils.isBatchTask(form.getGroupInfo())) { - groupService.updateStatus(groupId, GroupStatus.FINISH.getCode(), operator); - } - InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId); - InlongGroupRequest updateGroupRequest = groupInfo.genRequest(); - updateGroupRequest.setVersion(existGroup.getVersion()); - groupService.update(updateGroupRequest, operator); + try { + // update inlong group status and other info + InlongGroupInfo groupInfo = form.getGroupInfo(); + String operator = context.getOperator(); + Integer nextStatus = InlongGroupUtils.isBatchTask(form.getGroupInfo()) + ? GroupStatus.FINISH.getCode() : GroupStatus.CONFIG_SUCCESSFUL.getCode(); + groupService.updateStatus(groupId, nextStatus, operator); + + InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId); + InlongGroupRequest updateGroupRequest = groupInfo.genRequest(); + updateGroupRequest.setVersion(existGroup.getVersion()); + groupService.update(updateGroupRequest, operator); - // update status of other related configs - if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); - if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { - sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); - } else { - sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + // update status of other related configs + if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { + streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); + if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { + sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); + } else { + sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + } } - } - log.info("success to execute InitGroupCompleteListener for groupId={}", groupId); - return ListenerResult.success(); + log.info("success to execute InitGroupCompleteListener for groupId={}", groupId); + return ListenerResult.success(); + } catch (Exception e) { + throw new WorkflowListenerException(e); + } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index 141635c46..2796bde0b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -61,16 +61,19 @@ public class InitStreamCompleteListener implements ProcessEventListener { final String streamId = streamInfo.getInlongStreamId(); final String operator = context.getOperator(); - // Update status of other related configs - streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); - streamService.update(streamInfo.genRequest(), operator); - if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) { - sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); - } else { - sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + try { + // Update status of other related configs + streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); + streamService.updateWithoutCheck(streamInfo.genRequest(), operator); + if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) { + sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); + } else { + sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); + } + return ListenerResult.success(); + } catch (Exception e) { + throw new WorkflowListenerException(e); } - - return ListenerResult.success(); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 08ec23226..2664cee1e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils; import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils; import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils; import org.apache.inlong.manager.pojo.source.StreamSource; -import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.transform.TransformResponse; @@ -242,24 +241,4 @@ public class DefaultSortConfigOperator implements SortConfigOperator { groupInfo.getExtList().add(extInfo); } - /** - * Add config into inlong stream ext info - */ - private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) { - streamInfos.forEach(streamInfo -> { - if (streamInfo.getExtList() == null) { - streamInfo.setExtList(new ArrayList<>()); - } - - InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); - extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); - extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); - extInfo.setKeyName(InlongConstants.DATAFLOW); - extInfo.setKeyValue(value); - - streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName())); - streamInfo.getExtList().add(extInfo); - }); - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index 3d17b3972..5a9b5b144 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -94,7 +94,7 @@ public interface InlongStreamService { List<InlongStreamBriefInfo> listBriefWithSink(String groupId); /** - * InlongStream info that needs to be modified + * Update the InlongStream info * * @param request inlong stream info that needs to be modified * @param operator Edit person's name @@ -102,6 +102,15 @@ public interface InlongStreamService { */ Boolean update(InlongStreamRequest request, String operator); + /** + * Update the InlongStream - not check the InlongGroup status to which the stream belongs. + * + * @param request inlong stream info that needs to be modified + * @param operator Edit person's name + * @return whether succeed + */ + Boolean updateWithoutCheck(InlongStreamRequest request, String operator); + /** * Delete the specified inlong stream. * <p/> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index d8a49c30b..93146e278 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -59,7 +59,6 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -281,8 +280,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { return briefInfoList; } - @Transactional(rollbackFor = Throwable.class) @Override + @Transactional(rollbackFor = Throwable.class) public Boolean update(InlongStreamRequest request, String operator) { LOGGER.debug("begin to update inlong stream info={}", request); Preconditions.checkNotNull(request, "inlong stream request is empty"); @@ -292,22 +291,30 @@ public class InlongStreamServiceImpl implements InlongStreamService { Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); // Check if it can be modified - InlongGroupEntity inlongGroupEntity = this.checkGroupStatusIsTemp(groupId); + this.checkGroupStatusIsTemp(groupId); + + return this.updateWithoutCheck(request, operator); + } - // Make sure the stream was exists + @Override + @Transactional(rollbackFor = Throwable.class) + public Boolean updateWithoutCheck(InlongStreamRequest request, String operator) { + LOGGER.debug("begin to update inlong stream without check, request={}", request); + // make sure the stream was exists + String groupId = request.getInlongGroupId(); + String streamId = request.getInlongStreamId(); InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); if (streamEntity == null) { LOGGER.error("inlong stream not found by groupId={}, streamId={}", groupId, streamId); throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND); } - String errMsg = String.format("stream has already updated with group id=%s, stream id=%s, curVersion=%s", + + String errMsg = String.format("stream has already updated with groupId=%s, streamId=%s, curVersion=%s", streamEntity.getInlongGroupId(), streamEntity.getInlongStreamId(), request.getVersion()); if (!Objects.equals(streamEntity.getVersion(), request.getVersion())) { LOGGER.error(errMsg); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } - // Check whether the current inlong group status supports modification - this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request); CommonBeanUtils.copyProperties(request, streamEntity, true); streamEntity.setModifier(operator); @@ -316,13 +323,12 @@ public class InlongStreamServiceImpl implements InlongStreamService { LOGGER.error(errMsg); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } - // Update field information + // update stream fields updateField(groupId, streamId, request.getFieldList()); - // Update extension info - List<InlongStreamExtInfo> extInfos = request.getExtList(); - saveOrUpdateExt(groupId, streamId, extInfos); + // update stream extension infos + saveOrUpdateExt(groupId, streamId, request.getExtList()); - LOGGER.info("success to update inlong stream for groupId={}", groupId); + LOGGER.info("success to update inlong stream without check for groupId={} streamId={}", groupId, streamId); return true; } @@ -465,7 +471,6 @@ public class InlongStreamServiceImpl implements InlongStreamService { @Override @Transactional(propagation = Propagation.REQUIRES_NEW) public boolean updateStatus(String groupId, String streamId, Integer status, String operator) { - LOGGER.debug("begin to update status by groupId={}, streamId={}", groupId, streamId); streamMapper.updateStatusByIdentifier(groupId, streamId, status, operator); LOGGER.info("success to update stream after approve for groupId=" + groupId + ", streamId=" + streamId); return true; @@ -563,66 +568,14 @@ public class InlongStreamServiceImpl implements InlongStreamService { private InlongGroupEntity checkGroupStatusIsTemp(String groupId) { InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); Preconditions.checkNotNull(entity, "groupId is invalid"); - // Add/modify/delete is not allowed under certain inlong group status + // Add/modify/delete is not allowed under temporary inlong group status GroupStatus curState = GroupStatus.forCode(entity.getStatus()); if (GroupStatus.isTempStatus(curState)) { - LOGGER.error("inlong group status was not allowed to add/update/delete inlong stream"); + LOGGER.error("inlong groupId={} status={} was not allowed to add/update/delete stream", groupId, curState); throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED); } return entity; } - /** - * Verify the fields that cannot be modified in the current inlong group status - * - * @param groupStatus Inlong group status - * @param streamEntity Original inlong stream entity - * @param request New inlong stream information - */ - private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) { - if (streamEntity == null || request == null) { - return; - } - - // Fields that are not allowed to be modified when the inlong group [configuration is successful] - if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupStatus)) { - checkUpdatedFields(streamEntity, request); - } - - // Inlong group [Waiting to submit] [Approval rejected] [Configuration failed], if there is a - // stream source/stream sink, the fields that are not allowed to be modified - List<Integer> statusList = Arrays.asList( - GroupStatus.TO_BE_SUBMIT.getCode(), - GroupStatus.APPROVE_REJECTED.getCode(), - GroupStatus.CONFIG_FAILED.getCode()); - if (statusList.contains(groupStatus)) { - String groupId = request.getInlongGroupId(); - String streamId = request.getInlongStreamId(); - // Whether there is undeleted stream source and sink - int sourceCount = sourceService.getCount(groupId, streamId); - int sinkCount = sinkService.getCount(groupId, streamId); - if (sourceCount > 0 || sinkCount > 0) { - checkUpdatedFields(streamEntity, request); - } - } - } - - /** - * Check that groupId, streamId are not allowed to be modified - */ - private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamRequest request) { - String newGroupId = request.getInlongGroupId(); - if (newGroupId != null && !newGroupId.equals(streamEntity.getInlongGroupId())) { - LOGGER.error("current status was not allowed to update inlong group id"); - throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED); - } - - String newStreamId = request.getInlongStreamId(); - if (newStreamId != null && !newStreamId.equals(streamEntity.getInlongStreamId())) { - LOGGER.error("current status was not allowed to update inlong stream id"); - throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED); - } - } - }