This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d4d737167d2f5df013cb457939f2def6961c04c4 Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Wed Nov 9 17:48:51 2022 +0800 [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464) --- .../queue/pulsar/PulsarResourceOperator.java | 163 ++++++++++++--------- 1 file changed, 95 insertions(+), 68 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java index 5b698b856..23c13b740 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java @@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.common.util.Preconditions; -import org.apache.inlong.manager.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; @@ -45,6 +44,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.stream.Collectors; /** * Operator for create Pulsar Tenant, Namespace, Topic and Subscription @@ -84,42 +84,51 @@ public class PulsarResourceOperator implements QueueResourceOperator { // get pulsar cluster via the inlong cluster tag from the inlong group String clusterTag = groupInfo.getInlongClusterTag(); - PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null, - ClusterType.PULSAR); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - // create pulsar tenant and namespace - String tenant = pulsarCluster.getTenant(); - if (StringUtils.isEmpty(tenant)) { - tenant = InlongConstants.DEFAULT_PULSAR_TENANT; - } - String namespace = groupInfo.getMqResource(); - InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; - // if the group was not successful, need create tenant and namespace - if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) { - pulsarOperator.createTenant(pulsarAdmin, tenant); - log.info("success to create pulsar tenant for groupId={}, tenant={}", groupId, tenant); - pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace); - log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace); - } + List<PulsarClusterInfo> pulsarClusters = + clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream() + .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) + .collect(Collectors.toList()); + for (PulsarClusterInfo pulsarCluster : pulsarClusters) { + try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { + String clusterName = pulsarCluster.getName(); + // create pulsar tenant and namespace + String tenant = pulsarCluster.getTenant(); + if (StringUtils.isEmpty(tenant)) { + tenant = InlongConstants.DEFAULT_PULSAR_TENANT; + } + String namespace = groupInfo.getMqResource(); + InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; + // if the group was not successful, need create tenant and namespace + if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) { + pulsarOperator.createTenant(pulsarAdmin, tenant); + log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}", + groupId, tenant, clusterName); + pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace); + log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}", + groupId, namespace, clusterName); + } - // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId); - return; + // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic + List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); + if (streamInfoList == null || streamInfoList.isEmpty()) { + log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}", + groupId, clusterName); + return; + } + // create pulsar topic and subscription + for (InlongStreamBriefInfo stream : streamInfoList) { + this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource()); + this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), + stream.getInlongStreamId()); + } + } catch (Exception e) { + String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId, + pulsarCluster.toString()); + log.error(msg, e); + throw new WorkflowListenerException(msg + ": " + e.getMessage()); } - // create pulsar topic and subscription - for (InlongStreamBriefInfo stream : streamInfoList) { - this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource()); - this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId()); - } - } catch (Exception e) { - String msg = String.format("failed to create pulsar resource for groupId=%s", groupId); - log.error(msg, e); - throw new WorkflowListenerException(msg + ": " + e.getMessage()); } - - log.info("success to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster); + log.info("success to create pulsar resource for groupId={}", groupId); } @Override @@ -129,22 +138,28 @@ public class PulsarResourceOperator implements QueueResourceOperator { String groupId = groupInfo.getInlongGroupId(); log.info("begin to delete pulsar resource for groupId={}", groupId); - ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR); - try { - List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); - if (streamInfoList == null || streamInfoList.isEmpty()) { - log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId); - return; - } - for (InlongStreamBriefInfo streamInfo : streamInfoList) { - this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource()); + List<PulsarClusterInfo> pulsarClusters = + clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() + .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) + .collect(Collectors.toList()); + for (PulsarClusterInfo clusterInfo : pulsarClusters) { + String clusterName = clusterInfo.getName(); + try { + List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId); + if (streamInfoList == null || streamInfoList.isEmpty()) { + log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}", + groupId, clusterName); + return; + } + for (InlongStreamBriefInfo streamInfo : streamInfoList) { + this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource()); + } + } catch (Exception e) { + log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e); + throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage()); } - } catch (Exception e) { - log.error("failed to delete pulsar resource for groupId=" + groupId, e); - throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage()); } - - log.info("success to delete pulsar resource for groupId={}, cluster={}", groupId, clusterInfo); + log.info("success to delete pulsar resource for groupId={}", groupId); } @Override @@ -157,17 +172,22 @@ public class PulsarResourceOperator implements QueueResourceOperator { String streamId = streamInfo.getInlongStreamId(); log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId); - try { - // get pulsar cluster via the inlong cluster tag from the inlong group - PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne( - groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR); - // create pulsar topic and subscription - this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource()); - this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId); - } catch (Exception e) { - String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId); - log.error(msg, e); - throw new WorkflowListenerException(msg + ": " + e.getMessage()); + List<PulsarClusterInfo> pulsarClusters = + clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() + .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) + .collect(Collectors.toList()); + for (PulsarClusterInfo pulsarCluster : pulsarClusters) { + try { + // create pulsar topic and subscription + this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource()); + this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, + streamInfo.getMqResource(), streamId); + } catch (Exception e) { + String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s", + groupId, streamId,pulsarCluster.getName()); + log.error(msg, e); + throw new WorkflowListenerException(msg + ": " + e.getMessage()); + } } log.info("success to create pulsar resource for groupId={}, streamId={}", groupId, streamId); @@ -182,16 +202,23 @@ public class PulsarResourceOperator implements QueueResourceOperator { String streamId = streamInfo.getInlongStreamId(); log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId); - try { - ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR); - this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource()); - log.info("success to delete pulsar topic for groupId={}, streamId={}", groupId, streamId); - } catch (Exception e) { - String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", groupId, streamId); - log.error(msg, e); - throw new WorkflowListenerException(msg); + List<PulsarClusterInfo> pulsarClusters = + clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream() + .map(clusterInfo -> (PulsarClusterInfo) clusterInfo) + .collect(Collectors.toList()); + for (PulsarClusterInfo clusterInfo : pulsarClusters) { + String clusterName = clusterInfo.getName(); + try { + this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource()); + log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}", + groupId, streamId, clusterName); + } catch (Exception e) { + String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s", + groupId, streamId, clusterName); + log.error(msg, e); + throw new WorkflowListenerException(msg); + } } - log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId); }