This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2db5c9b25 [INLONG-7067][Manager] Provide MQ cluster info in consumption details (#7068) 2db5c9b25 is described below commit 2db5c9b2521fad9c9cf726b3c34380c79950873c Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Dec 27 12:13:15 2022 +0800 [INLONG-7067][Manager] Provide MQ cluster info in consumption details (#7068) --- .../manager/common/consts/InlongConstants.java | 2 ++ .../manager/pojo/consume/InlongConsumeInfo.java | 5 ++++ .../service/consume/AbstractConsumeOperator.java | 5 ++++ .../service/consume/ConsumePulsarOperator.java | 33 ++++++++++++++++++---- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index c41299faa..8a49627bb 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -35,6 +35,8 @@ public class InlongConstants { */ public static final String COMMA = ","; + public static final String SLASH = "/"; + public static final String COLON = ":"; public static final String SEMICOLON = ";"; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java index 48a48bed3..a8c2d3816 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java @@ -25,8 +25,10 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import java.util.Date; +import java.util.List; /** * Base inlong consume info @@ -87,6 +89,9 @@ public abstract class InlongConsumeInfo extends BaseInlongConsume { @ApiModelProperty(value = "Version number") private Integer version; + @ApiModelProperty(value = "MQ cluster info list") + private List<? extends ClusterInfo> clusterInfos; + public abstract InlongConsumeRequest genRequest(); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java index 8840701bc..52a0439de 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.consume; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; @@ -69,6 +70,10 @@ public abstract class AbstractConsumeOperator implements InlongConsumeOperator { @Override @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) public void updateOpt(InlongConsumeRequest request, String operator) { + // firstly check the topic info + if (StringUtils.isNotBlank(request.getTopic())) { + this.checkTopicInfo(request); + } // get the entity from request InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new); // set the ext params diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java index 16bb1c84e..6edc3a243 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java @@ -19,19 +19,23 @@ package org.apache.inlong.manager.service.consume; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongConsumeEntity; import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; +import org.apache.inlong.manager.pojo.cluster.ClusterInfo; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO; import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo; import org.apache.inlong.manager.service.cluster.InlongClusterService; @@ -40,6 +44,8 @@ import org.apache.inlong.manager.service.stream.InlongStreamService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; + /** * Inlong consume operator for Pulsar. */ @@ -85,12 +91,12 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator { // check the origin topic from request exists InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo) topicInfo; String originTopic = request.getTopic(); + if (originTopic.startsWith("persistent")) { + originTopic = originTopic.substring(originTopic.lastIndexOf(InlongConstants.SLASH) + 1); + request.setTopic(originTopic); + } Preconditions.checkTrue(pulsarTopic.getTopics().contains(originTopic), "Pulsar topic not exist for " + originTopic); - - // format the topic to 'tenant/namespace/topic' - request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, - pulsarTopic.getTenant(), pulsarTopic.getNamespace(), originTopic)); } @Override @@ -103,7 +109,14 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator { ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson(entity.getExtParams()); CommonBeanUtils.copyProperties(dto, consumeInfo); } - + String groupId = entity.getInlongGroupId(); + InlongGroupInfo groupInfo = groupService.get(groupId); + String clusterTag = groupInfo.getInlongClusterTag(); + List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist for groupId=" + groupId); + consumeInfo.setClusterInfos(clusterInfos); + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0); + consumeInfo.setTopic(getFullPulsarTopic(groupInfo, pulsarCluster.getTenant(), entity.getTopic())); return consumeInfo; } @@ -147,4 +160,12 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator { } } + private String getFullPulsarTopic(InlongGroupInfo groupInfo, String tenant, String topic) { + if (StringUtils.isEmpty(tenant)) { + tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + } + String namespace = groupInfo.getMqResource(); + return String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant, namespace, topic); + } + }