This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 61ce6e296a0fce5cec2fc4c4adc9b7761340e1ef Author: healchow <healc...@gmail.com> AuthorDate: Sun Jan 8 22:28:56 2023 +0800 [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171) --- .../apache/inlong/manager/client/BaseExample.java | 2 - .../apache/inlong/manager/client/ut/BaseTest.java | 2 - .../client/api/inner/ClientFactoryTest.java | 1 - .../resources/mappers/InlongGroupEntityMapper.xml | 7 +- .../pojo/cluster/kafka/KafkaClusterDTO.java | 15 ++- .../pojo/cluster/pulsar/PulsarClusterDTO.java | 17 +++- .../pojo/cluster/pulsar/PulsarClusterInfo.java | 4 +- .../pojo/cluster/pulsar/PulsarClusterRequest.java | 3 +- .../manager/pojo/group/InlongGroupBriefInfo.java | 5 +- .../manager/pojo/group/pulsar/InlongPulsarDTO.java | 5 + .../pojo/group/pulsar/InlongPulsarInfo.java | 6 -- .../pojo/group/pulsar/InlongPulsarRequest.java | 7 ++ .../service/cluster/InlongClusterServiceImpl.java | 25 ++--- .../service/cluster/KafkaClusterOperator.java | 2 - .../service/cluster/PulsarClusterOperator.java | 2 - .../service/consume/ConsumePulsarOperator.java | 14 ++- .../service/core/impl/AgentServiceImpl.java | 105 ++++++++++----------- .../service/group/InlongGroupOperator4Pulsar.java | 9 +- .../apply/ApproveConsumeProcessListener.java | 18 ++-- .../queue/pulsar/PulsarResourceOperator.java | 42 +++++---- .../source/pulsar/PulsarSourceOperator.java | 15 +-- .../service/cluster/InlongClusterServiceTest.java | 1 - .../service/consume/InlongConsumeServiceTest.java | 2 - 23 files changed, 170 insertions(+), 139 deletions(-) diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java index 9e1ddfaf2..dfa4cc6ee 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java @@ -71,8 +71,6 @@ public class BaseExample { pulsarInfo.setInCharges("admin"); // pulsar conf - pulsarInfo.setServiceUrl(pulsarServiceUrl); - pulsarInfo.setAdminUrl(pulsarAdminUrl); pulsarInfo.setTenant(tenant); pulsarInfo.setMqResource(namespace); diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java index 1818f657d..0a46f939c 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java @@ -103,8 +103,6 @@ public class BaseTest { pulsarInfo.setInCharges(IN_CHARGES); // pulsar conf - pulsarInfo.setServiceUrl(PULSAR_SERVICE_URL); - pulsarInfo.setAdminUrl(PULSAR_ADMIN_URL); pulsarInfo.setTenant(TENANT); pulsarInfo.setMqResource(NAMESPACE); diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java index 03eb5cfee..bb1984936 100644 --- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java +++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java @@ -717,7 +717,6 @@ class ClientFactoryTest { .clusterTags("test_cluster_tag") .type(ClusterType.PULSAR) .adminUrl("http://127.0.0.1:8080") - .tenant("public") .build(); stubFor( diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml index c99ed7595..77cbd6246 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml @@ -166,7 +166,8 @@ </select> <select id="selectBriefList" parameterType="org.apache.inlong.manager.pojo.group.InlongGroupPageRequest" resultType="org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo"> - select id, inlong_group_id, name, mq_type, mq_resource, inlong_cluster_tag, modify_time + select id, inlong_group_id, name, mq_type, mq_resource, dataReportType, inlong_cluster_tag, ext_params, + in_charges, status, creator, modifier, create_time, modify_time from inlong_group <where> is_deleted = 0 @@ -189,10 +190,8 @@ </foreach> </if> </where> - order by modify_time desc - limit 100 </select> - <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest"> + <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.dao.entity.InlongGroupEntity"> select <include refid="Base_Column_List"/> from inlong_group diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java index 9f5e25ca4..d6110671d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.cluster.kafka; import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -39,17 +40,25 @@ import javax.validation.constraints.NotNull; @ApiModel("Kafka cluster info") public class KafkaClusterDTO { - @Builder.Default - private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler"; - + /** + * Repeated save to ext_params field, it is convenient for DataProxy to obtain. + */ @JsonProperty("bootstrap.servers") + @ApiModelProperty(value = "Kafka bootstrap servers' URL, is the 'url' field of the cluster") private String bootstrapServers; + /** + * Saved to ext_params field, it is convenient for DataProxy to obtain. + */ + @Builder.Default + private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler"; + /** * Get the dto instance from the request */ public static KafkaClusterDTO getFromRequest(KafkaClusterRequest request) { return KafkaClusterDTO.builder() + .bootstrapServers(request.getUrl()) .build(); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java index d80d3da2b..3fa402d15 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java @@ -39,24 +39,31 @@ import javax.validation.constraints.NotNull; @ApiModel("Pulsar cluster info") public class PulsarClusterDTO { - @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080", notes = "Pulsar service URL is the 'url' field of the cluster") + @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080") private String adminUrl; + /** + * Repeated save to ext_params field, it is convenient for DataProxy to obtain. + */ + @ApiModelProperty(value = "Pulsar service URL, is the 'url' field of the cluster") + private String serviceUrl; + @ApiModelProperty(value = "Pulsar tenant, default is 'public'") - @Builder.Default - private String tenant = "public"; + private String tenant; + /** + * Saved to ext_params field, it is convenient for DataProxy to obtain. + */ @Builder.Default private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler"; - private String serviceUrl; - /** * Get the dto instance from the request */ public static PulsarClusterDTO getFromRequest(PulsarClusterRequest request) { return PulsarClusterDTO.builder() .adminUrl(request.getAdminUrl()) + .serviceUrl(request.getUrl()) .tenant(request.getTenant()) .build(); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java index 58049ab66..e6b4a3d7f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java @@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.cluster.pulsar; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; -import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -44,8 +43,7 @@ public class PulsarClusterInfo extends ClusterInfo { private String adminUrl; @ApiModelProperty(value = "Pulsar tenant, default is 'public'") - @Builder.Default - private String tenant = "public"; + private String tenant; public PulsarClusterInfo() { this.setType(ClusterType.PULSAR); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java index c3d913620..170fe2036 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java @@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.pojo.cluster.ClusterRequest; @@ -40,7 +41,7 @@ public class PulsarClusterRequest extends ClusterRequest { private String adminUrl; @ApiModelProperty(value = "Pulsar tenant, default is 'public'") - private String tenant = "public"; + private String tenant = InlongConstants.DEFAULT_PULSAR_TENANT; public PulsarClusterRequest() { this.setType(ClusterType.PULSAR); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java index c7d196b25..29c7418f2 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java @@ -54,9 +54,12 @@ public class InlongGroupBriefInfo { @ApiModelProperty(value = "MQ resource") private String mqResource; - @ApiModelProperty(value = "Inlong cluster tag") + @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table") private String inlongClusterTag; + @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table") + private String extParams; + @ApiModelProperty(value = "Name of responsible person, separated by commas") private String inCharges; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java index e42ae8837..7f0deddd1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java @@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -38,9 +39,13 @@ import javax.validation.constraints.NotNull; @Builder @NoArgsConstructor @AllArgsConstructor +@EqualsAndHashCode(callSuper = true) @ApiModel("Inlong group info for Pulsar") public class InlongPulsarDTO extends BaseInlongGroup { + @ApiModelProperty(value = "Pulsar tenant") + private String tenant; + @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;" + "serial: single partition, low throughput, and orderly messages") private String queueModule; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java index 36e07e5a6..451cccbfc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java @@ -42,12 +42,6 @@ public class InlongPulsarInfo extends InlongGroupInfo { @ApiModelProperty(value = "Pulsar tenant") private String tenant; - @ApiModelProperty(value = "Pulsar admin URL") - private String adminUrl; - - @ApiModelProperty(value = "Pulsar service URL") - private String serviceUrl; - @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;" + "serial: single partition, low throughput, and orderly messages") private String queueModule = "PARALLEL"; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java index 986306665..7da1b625e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java @@ -36,6 +36,13 @@ import org.apache.inlong.manager.pojo.group.InlongGroupRequest; @JsonTypeDefine(value = MQType.PULSAR) public class InlongPulsarRequest extends InlongGroupRequest { + /** + * TODO Add default value InlongConstants.DEFAULT_PULSAR_TENANT when you remove the 'tenant' + * from {@link org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest} + */ + @ApiModelProperty(value = "Pulsar tenant") + private String tenant; + @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;" + "serial: single partition, low throughput, and orderly messages") private String queueModule = "PARALLEL"; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 598dd10a0..6ddd96575 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -66,6 +66,7 @@ import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo; import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.user.UserInfo; import org.apache.inlong.manager.service.repository.DataProxyConfigRepository; @@ -1482,7 +1483,7 @@ public class InlongClusterServiceImpl implements InlongClusterService { return result; } - LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}", + LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated InlongGroup num={}", clusterTagList, groupList.size()); List<DataProxyTopicInfo> topicList = new ArrayList<>(); for (InlongGroupBriefInfo groupInfo : groupList) { @@ -1492,27 +1493,29 @@ public class InlongClusterServiceImpl implements InlongClusterService { String mqType = groupInfo.getMqType(); if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) { - List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId); - for (InlongStreamBriefInfo streamInfo : streamList) { + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupInfo.getExtParams()); + // First get the tenant from the InlongGroup, and then get it from the PulsarCluster. + String tenant = pulsarDTO.getTenant(); + if (StringUtils.isBlank(tenant)) { + // If there are multiple Pulsar clusters, take the first one. + // Note that the tenants in multiple Pulsar clusters must be identical. List<InlongClusterEntity> pulsarClusters = clusterMapper.selectByKey(realClusterTag, null, ClusterType.PULSAR); if (CollectionUtils.isEmpty(pulsarClusters)) { LOGGER.error("GetDPConfig: not found pulsar cluster by cluster tag={}", realClusterTag); continue; } + PulsarClusterDTO cluster = PulsarClusterDTO.getFromJson(pulsarClusters.get(0).getExtParams()); + tenant = cluster.getTenant(); + } - // if there are multiple Pulsar clusters, take the first one - InlongClusterEntity cluster = pulsarClusters.get(0); - PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams()); - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; - } - + List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId); + for (InlongStreamBriefInfo streamInfo : streamList) { String streamId = streamInfo.getInlongStreamId(); String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant, mqResource, streamInfo.getMqResource()); DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); + // must format to groupId/streamId, needed by DataProxy topicConfig.setInlongGroupId(groupId + "/" + streamId); topicConfig.setTopic(topic); topicList.add(topicConfig); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java index dceac1864..b3eb9346c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java @@ -75,7 +75,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(dto, kafkaClusterInfo); } - LOGGER.info("success to get kafka cluster info from entity"); return kafkaClusterInfo; } @@ -85,7 +84,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true); try { KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest); - dto.setBootstrapServers(kafkaRequest.getUrl()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); LOGGER.info("success to set entity for kafka cluster"); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java index a4c98bb11..2a724dc2b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java @@ -70,7 +70,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(dto, pulsarInfo); } - LOGGER.info("success to get pulsar cluster info from entity"); return pulsarInfo; } @@ -80,7 +79,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true); try { PulsarClusterDTO dto = PulsarClusterDTO.getFromRequest(pulsarRequest); - dto.setServiceUrl(request.getUrl()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); LOGGER.info("success to set entity for pulsar cluster"); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java index 6edc3a243..cf1f20d61 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java @@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.group.InlongGroupService; @@ -115,8 +116,17 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator { List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist for groupId=" + groupId); consumeInfo.setClusterInfos(clusterInfos); - PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0); - consumeInfo.setTopic(getFullPulsarTopic(groupInfo, pulsarCluster.getTenant(), entity.getTopic())); + + // First get the tenant from the InlongGroup, and then get it from the PulsarCluster. + String tenant = ((InlongPulsarInfo) groupInfo).getTenant(); + if (StringUtils.isBlank(tenant)) { + // If there are multiple Pulsar clusters, take the first one. + // Note that the tenants in multiple Pulsar clusters must be identical. + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0); + tenant = pulsarCluster.getTenant(); + } + + consumeInfo.setTopic(getFullPulsarTopic(groupInfo, tenant, entity.getTopic())); return consumeInfo; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 0efd42c32..13918600c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.common.constant.Constants; +import org.apache.inlong.common.constant.MQType; import org.apache.inlong.common.db.CommandEntity; import org.apache.inlong.common.enums.PullJobTypeEnum; import org.apache.inlong.common.enums.TaskTypeEnum; @@ -36,7 +37,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo; import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo; import org.apache.inlong.manager.common.consts.AgentConstants; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.SourceStatus; @@ -59,6 +59,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest; import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.service.core.AgentService; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; @@ -259,9 +260,6 @@ public class AgentServiceImpl implements AgentService { /** * Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated) - * - * @param request - * @return */ private List<DataConfig> processQueuedTasks(TaskRequest request) { HashSet<SourceStatus> needAddStatusSet = Sets.newHashSet(SourceStatus.TOBE_ISSUED_SET); @@ -307,7 +305,7 @@ public class AgentServiceImpl implements AgentService { List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes, TASK_FETCH_SIZE); for (StreamSourceEntity sourceEntity : sourceEntities) { - // refresh agentip and uuid to make it can be processed in queued task + // refresh agent ip and uuid to make it can be processed in queued task sourceEntity.setAgentIp(taskRequest.getAgentIp()); sourceEntity.setUuid(taskRequest.getUuid()); sourceMapper.updateByPrimaryKeySelective(sourceEntity); @@ -322,8 +320,6 @@ public class AgentServiceImpl implements AgentService { /** * Add subtasks to template tasks. * (Template task are agent_ip is null and template_id is null) - * - * @param taskRequest */ private void preProcessTemplateFileTask(TaskRequest taskRequest) { List<Integer> needCopiedStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), @@ -368,8 +364,6 @@ public class AgentServiceImpl implements AgentService { * 1.agent ip match * 2.cluster name match * Send the corresponding task action request according to the matching state of the tag and the current state - * - * @param taskRequest */ private void preProcessLabelFileTasks(TaskRequest taskRequest) { List<Integer> needProcessedStatusList = Arrays.asList( @@ -388,42 +382,41 @@ public class AgentServiceImpl implements AgentService { List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpAndCluster(needProcessedStatusList, Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName); - sourceEntities.stream() - .forEach(sourceEntity -> { - // case: agent tag unbind and mismatch source task - Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet( - SourceStatus.SOURCE_FROZEN, - SourceStatus.TO_BE_ISSUED_FROZEN); - if (!matchLabel(sourceEntity, clusterNodeEntity) - && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) { - LOGGER.info("Transform task({}) from {} to {} because tag mismatch " - + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), - sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), - agentIp, agentClusterName); - sourceMapper.updateStatus( - sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false); - } - - // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status - InlongStreamEntity streamEntity = streamMapper.selectByIdentifier( - sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId()); - Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet( - SourceStatus.SOURCE_NORMAL, - SourceStatus.TO_BE_ISSUED_ADD, - SourceStatus.TO_BE_ISSUED_ACTIVE); - Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet( - StreamStatus.SUSPENDED, StreamStatus.SUSPENDED); - if (matchLabel(sourceEntity, clusterNodeEntity) - && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())) - && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) { - LOGGER.info("Transform task({}) from {} to {} because tag rematch " - + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), - sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), - agentIp, agentClusterName); - sourceMapper.updateStatus( - sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false); - } - }); + sourceEntities.forEach(sourceEntity -> { + // case: agent tag unbind and mismatch source task + Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet( + SourceStatus.SOURCE_FROZEN, + SourceStatus.TO_BE_ISSUED_FROZEN); + if (!matchLabel(sourceEntity, clusterNodeEntity) + && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) { + LOGGER.info("Transform task({}) from {} to {} because tag mismatch " + + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), + sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), + agentIp, agentClusterName); + sourceMapper.updateStatus( + sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false); + } + + // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status + InlongStreamEntity streamEntity = streamMapper.selectByIdentifier( + sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId()); + Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet( + SourceStatus.SOURCE_NORMAL, + SourceStatus.TO_BE_ISSUED_ADD, + SourceStatus.TO_BE_ISSUED_ACTIVE); + Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet( + StreamStatus.SUSPENDED, StreamStatus.SUSPENDED); + if (matchLabel(sourceEntity, clusterNodeEntity) + && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())) + && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) { + LOGGER.info("Transform task({}) from {} to {} because tag rematch " + + "for agent({}) in cluster({})", sourceEntity.getAgentIp(), + sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), + agentIp, agentClusterName); + sourceMapper.updateStatus( + sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false); + } + }); } private InlongClusterNodeEntity selectByIpAndCluster(String clusterName, String ip) { @@ -436,9 +429,7 @@ public class AgentServiceImpl implements AgentService { nodeRequest.setKeyword(ip); nodeRequest.setParentId(clusterEntity.getId()); nodeRequest.setType(ClusterType.AGENT); - InlongClusterNodeEntity clusterNodeEntity = - clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null); - return clusterNodeEntity; + return clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null); } private int getOp(int status) { @@ -496,7 +487,7 @@ public class AgentServiceImpl implements AgentService { if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) { // add mq cluster setting List<MQClusterInfo> mqSet = new ArrayList<>(); - List<String> clusterTagList = Arrays.asList(groupEntity.getInlongClusterTag()); + List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag()); List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR); ClusterPageRequest pageRequest = ClusterPageRequest.builder() .typeList(typeList) @@ -512,16 +503,21 @@ public class AgentServiceImpl implements AgentService { mqSet.add(clusterInfo); } dataConfig.setMqClusters(mqSet); + // add topic setting - InlongClusterEntity cluster = mqClusterList.get(0); String mqResource = groupEntity.getMqResource(); String mqType = groupEntity.getMqType(); if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) { - PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams()); - String tenant = pulsarCluster.getTenant(); + // first get the tenant from the InlongGroup, and then get it from the PulsarCluster. + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + String tenant = pulsarDTO.getTenant(); if (StringUtils.isBlank(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + // If there are multiple Pulsar clusters, take the first one. + // Note that the tenants in multiple Pulsar clusters must be identical. + PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(mqClusterList.get(0).getExtParams()); + tenant = pulsarCluster.getTenant(); } + String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant, mqResource, streamEntity.getMqResource()); DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); @@ -594,6 +590,7 @@ public class AgentServiceImpl implements AgentService { : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA)); Set<String> sourceLabels = Stream.of( sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet()); - return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel)); + return sourceLabels.stream().anyMatch(clusterNodeLabels::contains); } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java index 810241f1a..f1cc29013 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java @@ -18,7 +18,6 @@ package org.apache.inlong.manager.service.group; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; @@ -122,10 +121,12 @@ public class InlongGroupOperator4Pulsar extends AbstractGroupOperator { public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne( groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR); - String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) - ? InlongConstants.DEFAULT_PULSAR_TENANT - : pulsarCluster.getTenant(); + // First get the tenant from the InlongGroup, and then get it from the PulsarCluster. + String tenant = ((InlongPulsarInfo) groupInfo).getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); + } InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo(); topicInfo.setTenant(tenant); topicInfo.setNamespace(groupInfo.getMqResource()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java index ed702cff5..4652c39d6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java @@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.listener.consume.apply; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ConsumeStatus; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; @@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo; import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator; @@ -87,11 +88,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener { String mqType = entity.getMqType(); if (MQType.TUBEMQ.equals(mqType)) { this.createTubeConsumerGroup(entity, context.getOperator()); - return ListenerResult.success("Create TubeMQ consumer group successful"); } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) { this.createPulsarSubscription(entity); } else if (MQType.KAFKA.equals(mqType)) { - // TODO add Kafka + // Kafka consumers do not need to register in advance } else { throw new WorkflowListenerException("Unsupported MQ type " + mqType); } @@ -130,17 +130,17 @@ public class ApproveConsumeProcessListener implements ProcessEventListener { ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR); PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - PulsarTopicInfo topicMessage = new PulsarTopicInfo(); - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); + String tenant = pulsarDTO.getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); } + PulsarTopicInfo topicMessage = new PulsarTopicInfo(); topicMessage.setTenant(tenant); topicMessage.setNamespace(mqResource); - String consumerGroup = entity.getConsumerGroup(); List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA)); - this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics); + this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics); } catch (Exception e) { log.error("create pulsar topic failed", e); throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java index 7f3b69a8a..4b81123d4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java @@ -21,10 +21,11 @@ import com.google.common.base.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.common.constant.MQType; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.GroupStatus; +import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; @@ -83,22 +84,27 @@ public class PulsarResourceOperator implements QueueResourceOperator { String clusterTag = groupInfo.getInlongClusterTag(); log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag); + if (!(groupInfo instanceof InlongPulsarInfo)) { + throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + groupId); + } + + InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; + String tenant = pulsarInfo.getTenant(); // get pulsar cluster via the inlong cluster tag from the inlong group List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); for (ClusterInfo clusterInfo : clusterInfos) { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { // create pulsar tenant and namespace - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); } // if the group was not successful, need create tenant and namespace if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) { pulsarOperator.createTenant(pulsarAdmin, tenant); String namespace = groupInfo.getMqResource(); - pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace); + pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace); log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}", groupId, tenant, namespace, pulsarCluster); @@ -131,7 +137,7 @@ public class PulsarResourceOperator implements QueueResourceOperator { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try { for (InlongStreamBriefInfo streamInfo : streamInfos) { - this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource()); + this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource()); } } catch (Exception e) { String msg = "failed to delete pulsar resource for groupId=" + groupId; @@ -192,7 +198,7 @@ public class PulsarResourceOperator implements QueueResourceOperator { for (ClusterInfo clusterInfo : clusterInfos) { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; try { - this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource()); + this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource()); log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}", groupId, streamId, streamInfo.getMqResource(), pulsarCluster); } catch (Exception e) { @@ -211,9 +217,9 @@ public class PulsarResourceOperator implements QueueResourceOperator { private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception { try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + String tenant = pulsarInfo.getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); } String namespace = pulsarInfo.getMqResource(); PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() @@ -233,9 +239,9 @@ public class PulsarResourceOperator implements QueueResourceOperator { private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName, String streamId) throws Exception { try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + String tenant = pulsarInfo.getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); } String namespace = pulsarInfo.getMqResource(); String fullTopicName = tenant + "/" + namespace + "/" + topicName; @@ -274,14 +280,14 @@ public class PulsarResourceOperator implements QueueResourceOperator { * Delete Pulsar Topic and Subscription, and delete the consumer group info. * TODO delete Subscription and InlongConsume info */ - private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName) + private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception { try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + String tenant = pulsarInfo.getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); } - String namespace = groupInfo.getMqResource(); + String namespace = pulsarInfo.getMqResource(); PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() .tenant(tenant) .namespace(namespace) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java index a29020a0e..a20629e4e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.enums.DataTypeEnum; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; @@ -33,6 +32,7 @@ import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.source.kafka.KafkaSource; @@ -112,12 +112,15 @@ public class PulsarSourceOperator extends AbstractSourceOperator { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; String adminUrl = pulsarCluster.getAdminUrl(); String serviceUrl = pulsarCluster.getUrl(); - String tenant = StringUtils.isEmpty(pulsarCluster.getTenant()) - ? InlongConstants.DEFAULT_PULSAR_TENANT - : pulsarCluster.getTenant(); + + // First get the tenant from the InlongGroup, and then get it from the PulsarCluster. + String tenant = ((InlongPulsarInfo) groupInfo).getTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getTenant(); + } Map<String, List<StreamSource>> sourceMap = Maps.newHashMap(); - streamInfos.forEach(streamInfo -> { + for (InlongStreamInfo streamInfo : streamInfos) { PulsarSource pulsarSource = new PulsarSource(); String streamId = streamInfo.getInlongStreamId(); pulsarSource.setSourceName(streamId); @@ -166,7 +169,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator { pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue()); pulsarSource.setFieldList(streamInfo.getFieldList()); sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource); - }); + } return sourceMap; } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java index 4e659a055..41afde323 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java @@ -76,7 +76,6 @@ public class InlongClusterServiceTest extends ServiceBaseTest { request.setName(clusterName); request.setType(ClusterType.PULSAR); request.setAdminUrl(adminUrl); - request.setTenant("public"); request.setInCharges(GLOBAL_OPERATOR); return clusterService.save(request, GLOBAL_OPERATOR); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java index 44bfe7258..3742aaaac 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java @@ -156,8 +156,6 @@ public class InlongConsumeServiceTest extends ServiceBaseTest { request.setType(ClusterType.PULSAR); String adminUrl = "http://127.0.0.1:8080"; request.setAdminUrl(adminUrl); - String tenant = "public"; - request.setTenant(tenant); request.setInCharges(GLOBAL_OPERATOR); clusterId = clusterService.save(request, GLOBAL_OPERATOR); }