This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d853493e4 [INLONG-9314][Manager] Support cluster switch for 
InlongGroup (#9353)
8d853493e4 is described below

commit 8d853493e42f8144257c3caeb6df5d1777df714b
Author: vernedeng <verned...@apache.org>
AuthorDate: Tue Dec 5 09:35:15 2023 +0800

    [INLONG-9314][Manager] Support cluster switch for InlongGroup (#9353)
---
 .../inlong/common/constant/ClusterSwitch.java      |   7 +
 .../inlong/manager/common/consts/SinkType.java     |  15 ++
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   2 +
 .../dao/mapper/TenantClusterTagEntityMapper.java   |   2 +
 .../mappers/TenantClusterTagEntityMapper.xml       |   8 +
 .../manager/service/group/InlongGroupService.java  |   4 +
 .../service/group/InlongGroupServiceImpl.java      | 164 +++++++++++++++++++++
 .../AbstractStandaloneSinkResourceOperator.java    |   3 +-
 .../web/controller/InlongGroupController.java      |  18 +++
 9 files changed, 222 insertions(+), 1 deletion(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
index 90816730ce..847162da1d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
@@ -31,4 +31,11 @@ public class ClusterSwitch {
      * MQ resource for backup, represents the namespace of Pulsar, the topic 
of TubeMQ, etc.
      */
     public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource";
+
+    /**
+     * Cluster swtich start time
+     */
+    public static final String CLUSTER_SWITCH_TIME = "cluster_switch_time";
+
+    public static final int FINISH_SWITCH_INTERVAL_MIN = 10;
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index b9b8a17350..68e80e7d07 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -17,9 +17,13 @@
 
 package org.apache.inlong.manager.common.consts;
 
+import org.apache.inlong.manager.common.enums.ClusterType;
+
 import java.lang.reflect.Field;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -74,10 +78,18 @@ public class SinkType extends StreamType {
     @SupportSortType(sortType = SortType.SORT_STANDALONE)
     public static final String CLS = "CLS";
 
+    public static final Map<String, String> SINK_TO_CLUSTER = new HashMap<>();
+
     public static final Set<String> SORT_FLINK_SINK = new HashSet<>();
 
     public static final Set<String> SORT_STANDALONE_SINK = new HashSet<>();
 
+    static {
+        SINK_TO_CLUSTER.put(CLS, ClusterType.SORT_CLS);
+        SINK_TO_CLUSTER.put(ELASTICSEARCH, ClusterType.SORT_ES);
+        SINK_TO_CLUSTER.put(PULSAR, ClusterType.SORT_PULSAR);
+    }
+
     static {
         SinkType obj = new SinkType();
         Class<? extends SinkType> clazz = obj.getClass();
@@ -98,4 +110,7 @@ public class SinkType extends StreamType {
         return sinkTypes.stream().anyMatch(SORT_FLINK_SINK::contains);
     }
 
+    public static String relatedSortClusterType(String sinkType) {
+        return SINK_TO_CLUSTER.get(sinkType);
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 289ddba60d..9d20be566d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -68,6 +68,8 @@ public enum ErrorCodeEnum {
     CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
     CLUSTER_TAG_NOT_FOUND(1104, "Cluster tag information does not exist"),
 
+    TENANT_CLUSTER_TAG_NOT_FOUND(1105, "Tenant Cluster tag does not exist"),
+
     DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
     DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
     DATA_NODE_ID_CHANGED(1152, "Data node information's id not equals"),
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
index 95d4beaa46..f9b34f2524 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/TenantClusterTagEntityMapper.java
@@ -34,6 +34,8 @@ public interface TenantClusterTagEntityMapper {
 
     TenantClusterTagEntity selectByPrimaryKey(Integer id);
 
+    TenantClusterTagEntity selectByUniqueKey(String clusterTag, String tenant);
+
     List<TenantClusterTagEntity> selectByTag(String clusterTag);
 
     List<TenantClusterTagEntity> selectByCondition(TenantClusterTagPageRequest 
request);
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
index 8e9e78dd22..3de752e8ca 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/TenantClusterTagEntityMapper.xml
@@ -41,6 +41,14 @@
         from tenant_cluster_tag
         where id = #{id,jdbcType=INTEGER}
     </select>
+    <select id="selectByUniqueKey" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List" />
+        from tenant_cluster_tag
+        where tenant = #{tenant,jdbcType=VARCHAR}
+        and cluster_tag = #{clusterTag,jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
     <select id="selectByTag" parameterType="java.lang.String" 
resultMap="BaseResultMap">
         select
         <include refid="Base_Column_List" />
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index c344548ddb..d2f8b09048 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -207,4 +207,8 @@ public interface InlongGroupService {
      */
     Map<String, Object> detail(String groupId);
 
+    Boolean startTagSwitch(String groupId, String clusterTag);
+
+    Boolean finishTagSwitch(String groupId);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 62f9ee7da3..2d87b74995 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -20,21 +20,27 @@ package org.apache.inlong.manager.service.group;
 import org.apache.inlong.manager.common.auth.Authentication.AuthType;
 import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
@@ -48,16 +54,22 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf;
 import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
 import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
 import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserInfo;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.apache.inlong.manager.service.workflow.WorkflowService;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.github.pagehelper.Page;
@@ -76,6 +88,7 @@ import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.validation.annotation.Validated;
 
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -86,7 +99,11 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG;
+import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+import static 
org.apache.inlong.common.constant.ClusterSwitch.CLUSTER_SWITCH_TIME;
+import static 
org.apache.inlong.common.constant.ClusterSwitch.FINISH_SWITCH_INTERVAL_MIN;
 import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+import static 
org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE;
 
 /**
  * Inlong group service layer implementation
@@ -107,11 +124,19 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     @Autowired
     private InlongStreamService streamService;
     @Autowired
+    private StreamSinkService streamSinkService;
+    @Autowired
     private StreamSourceEntityMapper streamSourceMapper;
     @Autowired
+    private TenantClusterTagEntityMapper tenantClusterTagMapper;
+    @Autowired
     private InlongStreamExtEntityMapper streamExtMapper;
     @Autowired
     private InlongClusterService clusterService;
+    @Autowired
+    private WorkflowService workflowService;
+    @Autowired
+    private InlongClusterEntityMapper clusterEntityMapper;
 
     @Autowired
     private InlongGroupOperatorFactory groupOperatorFactory;
@@ -672,4 +697,143 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
                 String.format("record has expired with record version=%d, 
request version=%d",
                         entity.getVersion(), request.getVersion()));
     }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean startTagSwitch(String groupId, String clusterTag) {
+        LOGGER.info("start to switch cluster tag for group={}, target tag={}", 
groupId, clusterTag);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+
+        // check if the group mode is data sync mode
+        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+            String errMSg = String.format("no need to switch sync mode group = 
{}", groupId);
+            LOGGER.error(errMSg);
+            throw new BusinessException(errMSg);
+        }
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Set<String> keys = groupExt.stream()
+                .map(InlongGroupExtInfo::getKeyName)
+                .collect(Collectors.toSet());
+
+        if (keys.contains(BACKUP_CLUSTER_TAG) || 
keys.contains(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("switch failed, current group is 
under switching, group=[%s]", groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // check if the cluster tag is under current tenant
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        if (groupEntity == null) {
+            LOGGER.error("inlong group not found by groupId={}", groupId);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        TenantClusterTagEntity tenantClusterTag =
+                tenantClusterTagMapper.selectByUniqueKey(clusterTag, 
groupEntity.getTenant());
+        if (tenantClusterTag == null) {
+            LOGGER.error("tenant cluster not found for tenant={}, 
clusterTag={}", groupEntity.getTenant(), clusterTag);
+            throw new 
BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
+        }
+
+        // check if all sink related sort cluster has the target cluster tag
+        List<StreamSink> sinks = 
streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
+        for (StreamSink sink : sinks) {
+            String clusterName = sink.getInlongClusterName();
+            List<InlongClusterEntity> clusterEntity =
+                    clusterEntityMapper.selectByKey(clusterTag, clusterName,
+                            
SinkType.relatedSortClusterType(sink.getSinkType()));
+            if (CollectionUtils.isEmpty(clusterEntity) || clusterEntity.size() 
!= 1) {
+                String errMsg = String.format("find no cluster or multiple 
cluster with clusterName=[%s]", clusterName);
+                LOGGER.error(errMsg);
+                throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, 
errMsg);
+            }
+
+        }
+
+        // config cluster tag and backup_cluster_tag
+        UserInfo userInfo = LoginUserUtils.getLoginUser();
+        InlongGroupRequest request = groupInfo.genRequest();
+        String oldClusterTag = request.getInlongClusterTag();
+        request.setInlongClusterTag(clusterTag);
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_CLUSTER_TAG, oldClusterTag));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
BACKUP_MQ_RESOURCE, request.getMqResource()));
+        request.getExtList().add(new InlongGroupExtInfo(null, groupId, 
CLUSTER_SWITCH_TIME,
+                LocalDateTime.now().toString()));
+        this.update(request, userInfo.getName());
+
+        // trigger group workflow to rebuild configs
+        this.triggerWorkFlow(groupInfo, userInfo);
+        LOGGER.info("success to switch cluster tag for group={}, target 
tag={}", groupId, clusterTag);
+        return true;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
+    public Boolean finishTagSwitch(String groupId) {
+        LOGGER.info("start to finish switch cluster tag for group={}", 
groupId);
+
+        InlongGroupInfo groupInfo = this.get(groupId);
+        UserInfo userInfo = LoginUserUtils.getLoginUser();
+
+        // check whether the current status supports modification
+        GroupStatus curStatus = GroupStatus.forCode(groupInfo.getStatus());
+        if (GroupStatus.notAllowedUpdate(curStatus)) {
+            String errMsg = String.format("Current status=%s is not allowed to 
update", curStatus);
+            LOGGER.error(errMsg);
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
+        }
+
+        // check if the group is under switching
+        List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
+        Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream()
+                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, v -> 
v));
+
+        if (!extInfoMap.containsKey(BACKUP_CLUSTER_TAG) || 
!extInfoMap.containsKey(BACKUP_MQ_RESOURCE)) {
+            String errMsg = String.format("finish switch failed, current group 
is not under switching, group=[%s]",
+                    groupId);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        InlongGroupExtInfo switchTime = extInfoMap.get(CLUSTER_SWITCH_TIME);
+        LocalDateTime switchStartTime =
+                switchTime == null ? LocalDateTime.MIN : 
LocalDateTime.parse(switchTime.getKeyValue());
+
+        // check the switch time
+        LocalDateTime allowSwitchTime = 
switchStartTime.plusMinutes(FINISH_SWITCH_INTERVAL_MIN);
+        if (LocalDateTime.now().isBefore(allowSwitchTime)) {
+            String errMsg = String.format("finish switch failed, please retry 
until={}", allowSwitchTime);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        // remove backup ext info
+        removeExt(extInfoMap.get(BACKUP_CLUSTER_TAG));
+        removeExt(extInfoMap.get(BACKUP_MQ_RESOURCE));
+        removeExt(extInfoMap.get(CLUSTER_SWITCH_TIME));
+
+        // trigger group workflow to rebuild configs
+        this.triggerWorkFlow(groupInfo, userInfo);
+        return true;
+    }
+
+    private void triggerWorkFlow(InlongGroupInfo groupInfo, UserInfo userInfo) 
{
+        GroupResourceProcessForm processForm = new GroupResourceProcessForm();
+        processForm.setGroupInfo(groupInfo);
+        List<InlongStreamInfo> streamList = 
streamService.list(groupInfo.getInlongGroupId());
+        processForm.setStreamInfos(streamList);
+        EXECUTOR_SERVICE.execute(
+                () -> 
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, 
processForm));
+    }
+
+    private void removeExt(InlongGroupExtInfo extInfo) {
+        if (extInfo == null || extInfo.getId() == null) {
+            return;
+        }
+        groupExtMapper.deleteByPrimaryKey(extInfo.getId());
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
index 61187e6f8d..dd1c26d9f7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/AbstractStandaloneSinkResourceOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.resource.sink;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -79,7 +80,7 @@ public abstract class AbstractStandaloneSinkResourceOperator 
implements SinkReso
 
     private String assignFromRelated(String sinkType, String groupId) {
         InlongGroupEntity group = groupEntityMapper.selectByGroupId(groupId);
-        String sortClusterType = SORT_PREFIX.concat(sinkType);
+        String sortClusterType = SinkType.relatedSortClusterType(sinkType);
         List<InlongClusterEntity> clusters = clusterEntityMapper
                 .selectByKey(null, null, sortClusterType).stream()
                 .filter(cluster -> checkCluster(cluster.getClusterTags(), 
group.getInlongClusterTag()))
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 2e49fd9380..0ad34ae506 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -40,6 +40,7 @@ import 
org.apache.inlong.manager.service.operationlog.OperationLog;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
@@ -206,4 +207,21 @@ public class InlongGroupController {
         return Response.success(groupService.detail(groupId));
     }
 
+    @RequestMapping(value = "/group/switch/start/{groupId}/{clusterTag}", 
method = RequestMethod.GET)
+    @ApiOperation(value = "start tag switch")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class, required = true),
+            @ApiImplicitParam(name = "clusterTag", value = "cluster tag", 
dataTypeClass = String.class, required = true)
+    })
+    public Response<Boolean> startTagSwitch(@PathVariable String groupId, 
@PathVariable String clusterTag) {
+        return Response.success(groupService.startTagSwitch(groupId, 
clusterTag));
+    }
+
+    @RequestMapping(value = "/group/switch/finish/{groupId}", method = 
RequestMethod.GET)
+    @ApiOperation(value = "finish tag switch")
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class, required = true)
+    public Response<Boolean> finishTagSwitch(@PathVariable String groupId) {
+        return Response.success(groupService.finishTagSwitch(groupId));
+    }
+
 }
\ No newline at end of file

Reply via email to