This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 8d853493e4 [INLONG-9314][Manager] Support cluster switch for InlongGroup (#9353) 8d853493e4 is described below commit 8d853493e42f8144257c3caeb6df5d1777df714b Author: vernedeng <verned...@apache.org> AuthorDate: Tue Dec 5 09:35:15 2023 +0800 [INLONG-9314][Manager] Support cluster switch for InlongGroup (#9353) --- .../inlong/common/constant/ClusterSwitch.java | 7 + .../inlong/manager/common/consts/SinkType.java | 15 ++ .../inlong/manager/common/enums/ErrorCodeEnum.java | 2 + .../dao/mapper/TenantClusterTagEntityMapper.java | 2 + .../mappers/TenantClusterTagEntityMapper.xml | 8 + .../manager/service/group/InlongGroupService.java | 4 + .../service/group/InlongGroupServiceImpl.java | 164 +++++++++++++++++++++ .../AbstractStandaloneSinkResourceOperator.java | 3 +- .../web/controller/InlongGroupController.java | 18 +++ 9 files changed, 222 insertions(+), 1 deletion(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java index 90816730ce..847162da1d 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java @@ -31,4 +31,11 @@ public class ClusterSwitch { * MQ resource for backup, represents the namespace of Pulsar, the topic of TubeMQ, etc. */ public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource"; + + /** + * Cluster swtich start time + */ + public static final String CLUSTER_SWITCH_TIME = "cluster_switch_time"; + + public static final int FINISH_SWITCH_INTERVAL_MIN = 10; } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index b9b8a17350..68e80e7d07 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -17,9 +17,13 @@ package org.apache.inlong.manager.common.consts; +import org.apache.inlong.manager.common.enums.ClusterType; + import java.lang.reflect.Field; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -74,10 +78,18 @@ public class SinkType extends StreamType { @SupportSortType(sortType = SortType.SORT_STANDALONE) public static final String CLS = "CLS"; + public static final Map<String, String> SINK_TO_CLUSTER = new HashMap<>(); + public static final Set<String> SORT_FLINK_SINK = new HashSet<>(); public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>(); + static { + SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS); + SINK_TO_CLUSTER.put(ELASTICSEARCH, ClusterType.SORT_ES); + SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR); + } + static { SinkType obj = new SinkType(); Class<? extends SinkType> clazz = obj.getClass(); @@ -98,4 +110,7 @@ public class SinkType extends StreamType { return sinkTypes.stream().anyMatch(SORT_FLINK_SINK::contains); } + public static String relatedSortClusterType(String sinkType) { + return SINK_TO_CLUSTER.get(sinkType); + } } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 289ddba60d..9d20be566d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -68,6 +68,8 @@ public enum ErrorCodeEnum { CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"), CLUSTER_TAG_NOT_FOUND(1104, "Cluster tag information does not exist"), + TENANT_CLUSTER_TAG_NOT_FOUND(1105, "Tenant Cluster tag does not exist"), + DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"), DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"), DATA_NODE_ID_CHANGED(1152, "Data node information's id not equals"), diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java index 95d4beaa46..f9b34f2524 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java @@ -34,6 +34,8 @@ public interface TenantClusterTagEntityMapper { TenantClusterTagEntity selectByPrimaryKey(Integer id); + TenantClusterTagEntity selectByUniqueKey(String clusterTag, String tenant); + List<TenantClusterTagEntity> selectByTag(String clusterTag); List<TenantClusterTagEntity> selectByCondition(TenantClusterTagPageRequest request); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml index 8e9e78dd22..3de752e8ca 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml @@ -41,6 +41,14 @@ from tenant_cluster_tag where id = #{id,jdbcType=INTEGER} </select> + <select id="selectByUniqueKey" resultMap="BaseResultMap"> + select + <include refid="Base_Column_List" /> + from tenant_cluster_tag + where tenant = #{tenant,jdbcType=VARCHAR} + and cluster_tag = #{clusterTag,jdbcType=VARCHAR} + and is_deleted = 0 + </select> <select id="selectByTag" parameterType="java.lang.String" resultMap="BaseResultMap"> select <include refid="Base_Column_List" /> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java index c344548ddb..d2f8b09048 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java @@ -207,4 +207,8 @@ public interface InlongGroupService { */ Map<String, Object> detail(String groupId); + Boolean startTagSwitch(String groupId, String clusterTag); + + Boolean finishTagSwitch(String groupId); + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 62f9ee7da3..2d87b74995 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -20,21 +20,27 @@ package org.apache.inlong.manager.service.group; import org.apache.inlong.manager.common.auth.Authentication.AuthType; import org.apache.inlong.manager.common.auth.SecretTokenAuthentication; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.enums.ProcessName; import org.apache.inlong.manager.common.enums.TenantUserTypeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity; +import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; +import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; @@ -48,16 +54,22 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest; import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sort.BaseSortConf; import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType; import org.apache.inlong.manager.pojo.sort.FlinkSortConf; import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf; import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.pojo.user.UserInfo; +import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.cluster.InlongClusterService; +import org.apache.inlong.manager.service.sink.StreamSinkService; import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.StreamSourceOperator; import org.apache.inlong.manager.service.stream.InlongStreamService; +import org.apache.inlong.manager.service.workflow.WorkflowService; import com.fasterxml.jackson.core.type.TypeReference; import com.github.pagehelper.Page; @@ -76,6 +88,7 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -86,7 +99,11 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG; +import static org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE; +import static org.apache.inlong.common.constant.ClusterSwitch.CLUSTER_SWITCH_TIME; +import static org.apache.inlong.common.constant.ClusterSwitch.FINISH_SWITCH_INTERVAL_MIN; import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE; +import static org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE; /** * Inlong group service layer implementation @@ -107,11 +124,19 @@ public class InlongGroupServiceImpl implements InlongGroupService { @Autowired private InlongStreamService streamService; @Autowired + private StreamSinkService streamSinkService; + @Autowired private StreamSourceEntityMapper streamSourceMapper; @Autowired + private TenantClusterTagEntityMapper tenantClusterTagMapper; + @Autowired private InlongStreamExtEntityMapper streamExtMapper; @Autowired private InlongClusterService clusterService; + @Autowired + private WorkflowService workflowService; + @Autowired + private InlongClusterEntityMapper clusterEntityMapper; @Autowired private InlongGroupOperatorFactory groupOperatorFactory; @@ -672,4 +697,143 @@ public class InlongGroupServiceImpl implements InlongGroupService { String.format("record has expired with record version=%d, request version=%d", entity.getVersion(), request.getVersion())); } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW) + public Boolean startTagSwitch(String groupId, String clusterTag) { + LOGGER.info("start to switch cluster tag for group={}, target tag={}", groupId, clusterTag); + + InlongGroupInfo groupInfo = this.get(groupId); + + // check if the group mode is data sync mode + if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + String errMSg = String.format("no need to switch sync mode group = {}", groupId); + LOGGER.error(errMSg); + throw new BusinessException(errMSg); + } + + // check if the group is under switching + List<InlongGroupExtInfo> groupExt = groupInfo.getExtList(); + Set<String> keys = groupExt.stream() + .map(InlongGroupExtInfo::getKeyName) + .collect(Collectors.toSet()); + + if (keys.contains(BACKUP_CLUSTER_TAG) || keys.contains(BACKUP_MQ_RESOURCE)) { + String errMsg = String.format("switch failed, current group is under switching, group=[%s]", groupId); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + // check if the cluster tag is under current tenant + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + if (groupEntity == null) { + LOGGER.error("inlong group not found by groupId={}", groupId); + throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND); + } + + TenantClusterTagEntity tenantClusterTag = + tenantClusterTagMapper.selectByUniqueKey(clusterTag, groupEntity.getTenant()); + if (tenantClusterTag == null) { + LOGGER.error("tenant cluster not found for tenant={}, clusterTag={}", groupEntity.getTenant(), clusterTag); + throw new BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND); + } + + // check if all sink related sort cluster has the target cluster tag + List<StreamSink> sinks = streamSinkService.listSink(groupEntity.getInlongGroupId(), null); + for (StreamSink sink : sinks) { + String clusterName = sink.getInlongClusterName(); + List<InlongClusterEntity> clusterEntity = + clusterEntityMapper.selectByKey(clusterTag, clusterName, + SinkType.relatedSortClusterType(sink.getSinkType())); + if (CollectionUtils.isEmpty(clusterEntity) || clusterEntity.size() != 1) { + String errMsg = String.format("find no cluster or multiple cluster with clusterName=[%s]", clusterName); + LOGGER.error(errMsg); + throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, errMsg); + } + + } + + // config cluster tag and backup_cluster_tag + UserInfo userInfo = LoginUserUtils.getLoginUser(); + InlongGroupRequest request = groupInfo.genRequest(); + String oldClusterTag = request.getInlongClusterTag(); + request.setInlongClusterTag(clusterTag); + request.getExtList().add(new InlongGroupExtInfo(null, groupId, BACKUP_CLUSTER_TAG, oldClusterTag)); + request.getExtList().add(new InlongGroupExtInfo(null, groupId, BACKUP_MQ_RESOURCE, request.getMqResource())); + request.getExtList().add(new InlongGroupExtInfo(null, groupId, CLUSTER_SWITCH_TIME, + LocalDateTime.now().toString())); + this.update(request, userInfo.getName()); + + // trigger group workflow to rebuild configs + this.triggerWorkFlow(groupInfo, userInfo); + LOGGER.info("success to switch cluster tag for group={}, target tag={}", groupId, clusterTag); + return true; + } + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW) + public Boolean finishTagSwitch(String groupId) { + LOGGER.info("start to finish switch cluster tag for group={}", groupId); + + InlongGroupInfo groupInfo = this.get(groupId); + UserInfo userInfo = LoginUserUtils.getLoginUser(); + + // check whether the current status supports modification + GroupStatus curStatus = GroupStatus.forCode(groupInfo.getStatus()); + if (GroupStatus.notAllowedUpdate(curStatus)) { + String errMsg = String.format("Current status=%s is not allowed to update", curStatus); + LOGGER.error(errMsg); + throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg); + } + + // check if the group is under switching + List<InlongGroupExtInfo> groupExt = groupInfo.getExtList(); + Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream() + .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, v -> v)); + + if (!extInfoMap.containsKey(BACKUP_CLUSTER_TAG) || !extInfoMap.containsKey(BACKUP_MQ_RESOURCE)) { + String errMsg = String.format("finish switch failed, current group is not under switching, group=[%s]", + groupId); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + InlongGroupExtInfo switchTime = extInfoMap.get(CLUSTER_SWITCH_TIME); + LocalDateTime switchStartTime = + switchTime == null ? LocalDateTime.MIN : LocalDateTime.parse(switchTime.getKeyValue()); + + // check the switch time + LocalDateTime allowSwitchTime = switchStartTime.plusMinutes(FINISH_SWITCH_INTERVAL_MIN); + if (LocalDateTime.now().isBefore(allowSwitchTime)) { + String errMsg = String.format("finish switch failed, please retry until={}", allowSwitchTime); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + // remove backup ext info + removeExt(extInfoMap.get(BACKUP_CLUSTER_TAG)); + removeExt(extInfoMap.get(BACKUP_MQ_RESOURCE)); + removeExt(extInfoMap.get(CLUSTER_SWITCH_TIME)); + + // trigger group workflow to rebuild configs + this.triggerWorkFlow(groupInfo, userInfo); + return true; + } + + private void triggerWorkFlow(InlongGroupInfo groupInfo, UserInfo userInfo) { + GroupResourceProcessForm processForm = new GroupResourceProcessForm(); + processForm.setGroupInfo(groupInfo); + List<InlongStreamInfo> streamList = streamService.list(groupInfo.getInlongGroupId()); + processForm.setStreamInfos(streamList); + EXECUTOR_SERVICE.execute( + () -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm)); + } + + private void removeExt(InlongGroupExtInfo extInfo) { + if (extInfo == null || extInfo.getId() == null) { + return; + } + groupExtMapper.deleteByPrimaryKey(extInfo.getId()); + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java index 61187e6f8d..dd1c26d9f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.resource.sink; import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.dao.entity.InlongGroupEntity; @@ -79,7 +80,7 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso private String assignFromRelated(String sinkType, String groupId) { InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId); - String sortClusterType = SORT_PREFIX.concat(sinkType); + String sortClusterType = SinkType.relatedSortClusterType(sinkType); List<InlongClusterEntity> clusters = clusterEntityMapper .selectByKey(null, null, sortClusterType).stream() .filter(cluster -> checkCluster(cluster.getClusterTags(), group.getInlongClusterTag())) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java index 2e49fd9380..0ad34ae506 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java @@ -40,6 +40,7 @@ import org.apache.inlong.manager.service.operationlog.OperationLog; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; @@ -206,4 +207,21 @@ public class InlongGroupController { return Response.success(groupService.detail(groupId)); } + @RequestMapping(value = "/group/switch/start/{groupId}/{clusterTag}", method = RequestMethod.GET) + @ApiOperation(value = "start tag switch") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true), + @ApiImplicitParam(name = "clusterTag", value = "cluster tag", dataTypeClass = String.class, required = true) + }) + public Response<Boolean> startTagSwitch(@PathVariable String groupId, @PathVariable String clusterTag) { + return Response.success(groupService.startTagSwitch(groupId, clusterTag)); + } + + @RequestMapping(value = "/group/switch/finish/{groupId}", method = RequestMethod.GET) + @ApiOperation(value = "finish tag switch") + @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) + public Response<Boolean> finishTagSwitch(@PathVariable String groupId) { + return Response.success(groupService.finishTagSwitch(groupId)); + } + } \ No newline at end of file