This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db5c9b25 [INLONG-7067][Manager] Provide MQ cluster info in 
consumption details (#7068)
2db5c9b25 is described below

commit 2db5c9b2521fad9c9cf726b3c34380c79950873c
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Dec 27 12:13:15 2022 +0800

    [INLONG-7067][Manager] Provide MQ cluster info in consumption details 
(#7068)
---
 .../manager/common/consts/InlongConstants.java     |  2 ++
 .../manager/pojo/consume/InlongConsumeInfo.java    |  5 ++++
 .../service/consume/AbstractConsumeOperator.java   |  5 ++++
 .../service/consume/ConsumePulsarOperator.java     | 33 ++++++++++++++++++----
 4 files changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c41299faa..8a49627bb 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -35,6 +35,8 @@ public class InlongConstants {
      */
     public static final String COMMA = ",";
 
+    public static final String SLASH = "/";
+
     public static final String COLON = ":";
 
     public static final String SEMICOLON = ";";
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
index 48a48bed3..a8c2d3816 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
@@ -25,8 +25,10 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 
 import java.util.Date;
+import java.util.List;
 
 /**
  * Base inlong consume info
@@ -87,6 +89,9 @@ public abstract class InlongConsumeInfo extends 
BaseInlongConsume {
     @ApiModelProperty(value = "Version number")
     private Integer version;
 
+    @ApiModelProperty(value = "MQ cluster info list")
+    private List<? extends ClusterInfo> clusterInfos;
+
     public abstract InlongConsumeRequest genRequest();
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
index 8840701bc..52a0439de 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.consume;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -69,6 +70,10 @@ public abstract class AbstractConsumeOperator implements 
InlongConsumeOperator {
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
     public void updateOpt(InlongConsumeRequest request, String operator) {
+        // firstly check the topic info
+        if (StringUtils.isNotBlank(request.getTopic())) {
+            this.checkTopicInfo(request);
+        }
         // get the entity from request
         InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, 
InlongConsumeEntity::new);
         // set the ext params
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 16bb1c84e..6edc3a243 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -19,19 +19,23 @@ package org.apache.inlong.manager.service.consume;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
@@ -40,6 +44,8 @@ import 
org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+
 /**
  * Inlong consume operator for Pulsar.
  */
@@ -85,12 +91,12 @@ public class ConsumePulsarOperator extends 
AbstractConsumeOperator {
         // check the origin topic from request exists
         InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo) topicInfo;
         String originTopic = request.getTopic();
+        if (originTopic.startsWith("persistent")) {
+            originTopic = 
originTopic.substring(originTopic.lastIndexOf(InlongConstants.SLASH) + 1);
+            request.setTopic(originTopic);
+        }
         Preconditions.checkTrue(pulsarTopic.getTopics().contains(originTopic),
                 "Pulsar topic not exist for " + originTopic);
-
-        // format the topic to 'tenant/namespace/topic'
-        request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
-                pulsarTopic.getTenant(), pulsarTopic.getNamespace(), 
originTopic));
     }
 
     @Override
@@ -103,7 +109,14 @@ public class ConsumePulsarOperator extends 
AbstractConsumeOperator {
             ConsumePulsarDTO dto = 
ConsumePulsarDTO.getFromJson(entity.getExtParams());
             CommonBeanUtils.copyProperties(dto, consumeInfo);
         }
-
+        String groupId = entity.getInlongGroupId();
+        InlongGroupInfo groupInfo = groupService.get(groupId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist 
for groupId=" + groupId);
+        consumeInfo.setClusterInfos(clusterInfos);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) 
clusterInfos.get(0);
+        consumeInfo.setTopic(getFullPulsarTopic(groupInfo, 
pulsarCluster.getTenant(), entity.getTopic()));
         return consumeInfo;
     }
 
@@ -147,4 +160,12 @@ public class ConsumePulsarOperator extends 
AbstractConsumeOperator {
         }
     }
 
+    private String getFullPulsarTopic(InlongGroupInfo groupInfo, String 
tenant, String topic) {
+        if (StringUtils.isEmpty(tenant)) {
+            tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+        }
+        String namespace = groupInfo.getMqResource();
+        return String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant, 
namespace, topic);
+    }
+
 }

Reply via email to