This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d8c83804b1 [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941) d8c83804b1 is described below commit d8c83804b1078089a233779b042f24067378dbc4 Author: haibo.duan <dhaibo1...@live.cn> AuthorDate: Tue Nov 28 20:10:51 2023 +0800 [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941) --- .../inlong/manager/common/util/HttpUtils.java | 13 + .../queue/pulsar/PulsarBrokerEntryMetadata.java | 33 + .../pojo/queue/pulsar/PulsarLookupTopicInfo.java | 35 + .../pojo/queue/pulsar/PulsarMessageInfo.java | 40 + .../pojo/queue/pulsar/PulsarMessageMetadata.java | 63 ++ .../pojo/queue/pulsar/PulsarNamespacePolicies.java | 34 + .../queue/pulsar/PulsarPersistencePolicies.java | 35 + .../pojo/queue/pulsar/PulsarRetentionPolicies.java | 33 + .../pojo/queue/pulsar/PulsarTenantInfo.java | 36 + .../pojo/queue/pulsar/PulsarTopicMetadata.java | 38 + inlong-manager/manager-service/pom.xml | 85 +- .../service/cluster/PulsarClusterOperator.java | 10 +- .../apply/ApproveConsumeProcessListener.java | 10 +- .../node/pulsar/PulsarDataNodeOperator.java | 10 +- .../resource/queue/pulsar/PulsarOperator.java | 212 +++-- .../queue/pulsar/PulsarQueueResourceOperator.java | 163 ++-- .../service/resource/queue/pulsar/PulsarUtils.java | 958 ++++++++++++++++++++- .../sink/pulsar/PulsarResourceOperator.java | 14 +- .../manager/service/queue/PulsarUtilsTest.java | 581 ++++++++++++- 19 files changed, 2072 insertions(+), 331 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java index ace3d513f1..1e7efe68ae 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java @@ -112,6 +112,19 @@ public class HttpUtils { return response.getBody(); } + /** + * Send an void HTTP request + */ + public static void request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody, + HttpHeaders header) { + log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody)); + HttpEntity<Object> requestEntity = new HttpEntity<>(requestBody, header); + ResponseEntity<String> response = restTemplate.exchange(url, httpMethod, requestEntity, String.class); + + log.debug("success request to {}, status code {}", url, response.getStatusCode()); + Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), "Request failed"); + } + /** * Send GET request to the specified URL. */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java new file mode 100644 index 0000000000..de02d96ad6 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarBrokerEntryMetadata { + + private long brokerTimestamp; + private long index; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java new file mode 100644 index 0000000000..12596e2ab3 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarLookupTopicInfo { + + private String brokerUrl; + private String httpUrl; + private String nativeUrl; + private String brokerUrlSsl; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java new file mode 100644 index 0000000000..c84b8cbf9f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarMessageInfo { + + private String messageId; + private String topic; + private byte[] body; + private transient Map<String, String> properties; + private boolean poolMessage; + private PulsarBrokerEntryMetadata pulsarBrokerEntryMetadata; + private PulsarMessageMetadata pulsarMessageMetadata; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java new file mode 100644 index 0000000000..3cfbae2fb4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarMessageMetadata { + + private int payloadSize; + private String partitionKey; + private boolean compactedOut; + private long eventTime; + private boolean partitionKeyB64Encoded; + private byte[] orderingKey; + private long sequenceId; + private boolean nullValue; + private boolean nullPartitionKey; + private Map<String, String> properties; + private long publishTime; + private long deliverAtTime; + private int markerType; + private long txnidLeastBits; + private long txnidMostBits; + private long highestSequenceId; + private String uuid; + private int numChunksFromMsg; + private int totalChunkMsgSize; + private int chunkId; + private String producerName; + private String replicatedFrom; + private int uncompressedSize; + private int numMessagesInBatch; + private String encryptionAlgo; + private String compression; + private byte[] encryptionParam; + private byte[] schemaVersion; + private List<String> replicateTos; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java new file mode 100644 index 0000000000..94dbd47bf7 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarNamespacePolicies { + + private int messageTtlInSeconds; + private PulsarRetentionPolicies retentionPolicies; + private PulsarPersistencePolicies persistence; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java new file mode 100644 index 0000000000..ada4f914fe --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarPersistencePolicies { + + private int bookkeeperEnsemble; + private int bookkeeperWriteQuorum; + private int bookkeeperAckQuorum; + private double managedLedgerMaxMarkDeleteRate; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java new file mode 100644 index 0000000000..155675291e --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarRetentionPolicies { + + private int retentionTimeInMinutes; + private long retentionSizeInMB; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java new file mode 100644 index 0000000000..2e093c1a57 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Set; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarTenantInfo { + + Set<String> adminRoles; + + Set<String> allowedClusters; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java new file mode 100644 index 0000000000..c7de1aacf2 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.queue.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PulsarTopicMetadata { + + private int partitions; + + private boolean deleted; + + private Map<String, String> properties; +} diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml index f3bd26f7f8..5981d30332 100644 --- a/inlong-manager/manager-service/pom.xml +++ b/inlong-manager/manager-service/pom.xml @@ -122,36 +122,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client</artifactId> - <exclusions> - <exclusion> - <groupId>javax.validation</groupId> - <artifactId>validation-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.pulsar</groupId> - <artifactId>bouncy-castle-bc</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcutil-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-ext-jdk15on</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>com.tencentcloudapi</groupId> <artifactId>tencentcloud-sdk-java-cls</artifactId> @@ -288,48 +258,6 @@ </exclusions> </dependency> - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-admin</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </exclusion> - <exclusion> - <groupId>org.checkerframework</groupId> - <artifactId>checker-qual</artifactId> - </exclusion> - <exclusion> - <groupId>javax.validation</groupId> - <artifactId>validation-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>jul-to-slf4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.pulsar</groupId> - <artifactId>bouncy-castle-bc</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcpkix-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcutil-jdk15on</artifactId> - </exclusion> - <exclusion> - <groupId>org.bouncycastle</groupId> - <artifactId>bcprov-ext-jdk15on</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> @@ -572,6 +500,19 @@ <artifactId>sdk-common</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>pulsar</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java index b2dea0bb7b..d4b34e83f2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java @@ -33,11 +33,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.net.InetSocketAddress; @@ -60,6 +60,9 @@ public class PulsarClusterOperator extends AbstractClusterOperator { @Autowired private ObjectMapper objectMapper; + @Autowired + private RestTemplate restTemplate; + @Override public Boolean accept(String clusterType) { return getClusterType().equals(clusterType); @@ -127,9 +130,10 @@ public class PulsarClusterOperator extends AbstractClusterOperator { * @return */ private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) { - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) { + + try { // test connect for pulsar adminUrl - pulsarAdmin.tenants().getTenants(); + PulsarUtils.getTenants(restTemplate, pulsarInfo); return true; } catch (Exception e) { String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarInfo.getAdminUrl()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java index 1e6114bffa..5b39e14bf2 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java @@ -38,7 +38,6 @@ import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo; import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator; -import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; @@ -46,7 +45,6 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -131,7 +129,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener { String clusterTag = groupEntity.getInlongClusterTag(); ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR); PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { + try { InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams()); String tenant = pulsarDTO.getPulsarTenant(); if (StringUtils.isBlank(tenant)) { @@ -142,7 +140,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener { topicMessage.setNamespace(mqResource); List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA)); - this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics); + this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics); } catch (Exception e) { log.error("create pulsar topic failed", e); throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: " @@ -150,10 +148,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener { } } - private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo, + private void createPulsarSubscription(PulsarClusterInfo clusterInfo, String subscription, PulsarTopicInfo topicInfo, List<String> topics) { try { - pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics); + pulsarOperator.createSubscriptions(clusterInfo, subscription, topicInfo, topics); } catch (Exception e) { log.error("create pulsar consumer group failed", e); throw new WorkflowListenerException("failed to create pulsar consumer group"); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java index 819df3df34..6b9025741b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java @@ -34,11 +34,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; /** * Pulsar data node operator @@ -51,6 +51,9 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator { @Autowired private ObjectMapper objectMapper; + @Autowired + private RestTemplate restTemplate; + @Override public Boolean accept(String dataNodeType) { return getDataNodeType().equals(dataNodeType); @@ -106,15 +109,14 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator { PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(adminUrl) .token(token).build(); - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) { + try { // test connect for pulsar adminUrl - pulsarAdmin.tenants().getTenants(); + PulsarUtils.getTenants(restTemplate, pulsarClusterInfo); return true; } catch (Exception e) { String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarClusterInfo.getAdminUrl()); LOGGER.error(errMsg, e); throw new BusinessException(errMsg); } - } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index 77feca618c..7c55f65e43 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -23,9 +23,16 @@ import org.apache.inlong.manager.common.conversion.ConversionHandle; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo; import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl; import org.apache.inlong.manager.service.message.DeserializeOperator; @@ -33,19 +40,12 @@ import org.apache.inlong.manager.service.message.DeserializeOperatorFactory; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.Namespaces; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; -import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; import java.util.ArrayList; import java.util.List; @@ -65,42 +65,44 @@ public class PulsarOperator { private static final int MAX_PARTITION = 1000; private static final int RETRY_TIMES = 3; private static final int DELAY_SECONDS = 5; - private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!"; @Autowired public DeserializeOperatorFactory deserializeOperatorFactory; @Autowired private ConversionHandle conversionHandle; + @Autowired + private RestTemplate restTemplate; /** * Create Pulsar tenant */ - public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException { + public void createTenant(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception { LOGGER.info("begin to create pulsar tenant={}", tenant); Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER, "Tenant cannot be empty"); try { - List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin); - boolean exists = this.tenantIsExists(pulsarAdmin, tenant); + List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo); + boolean exists = this.tenantIsExists(pulsarClusterInfo, tenant); if (exists) { LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant); return; } - TenantInfoImpl tenantInfo = new TenantInfoImpl(); + PulsarTenantInfo tenantInfo = new PulsarTenantInfo(); tenantInfo.setAllowedClusters(Sets.newHashSet(clusters)); tenantInfo.setAdminRoles(Sets.newHashSet()); - pulsarAdmin.tenants().createTenant(tenant, tenantInfo); + PulsarUtils.createTenant(restTemplate, pulsarClusterInfo, tenant, tenantInfo); LOGGER.info("success to create pulsar tenant={}", tenant); - } catch (PulsarAdminException e) { + } catch (Exception e) { LOGGER.error("failed to create pulsar tenant=" + tenant, e); throw e; } } /** - * Create Pulsar namespace + * Create Pulsar namespace. */ - public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace) - throws PulsarAdminException { + public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInfo pulsarInfo, String tenant, + String namespace) + throws Exception { Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER, "pulsar tenant cannot be empty during create namespace"); Preconditions.expectNotBlank(namespace, ErrorCodeEnum.INVALID_PARAMETER, @@ -110,19 +112,17 @@ public class PulsarOperator { LOGGER.info("begin to create namespace={}", namespaceName); try { // Check whether the namespace exists, and create it if it does not exist - boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace); + boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, namespace); if (isExists) { LOGGER.warn("namespace={} already exists, skip to create", namespaceName); return; } - List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin); - Namespaces namespaces = pulsarAdmin.namespaces(); - namespaces.createNamespace(namespaceName, Sets.newHashSet(clusters)); + PulsarNamespacePolicies policies = new PulsarNamespacePolicies(); // Configure message TTL Integer ttl = pulsarInfo.getTtl(); if (ttl > 0) { - namespaces.setNamespaceMessageTTL(namespaceName, conversionHandle.handleConversion(ttl, + policies.setMessageTtlInSeconds(conversionHandle.handleConversion(ttl, pulsarInfo.getTtlUnit().toLowerCase() + "_seconds")); } @@ -139,15 +139,17 @@ public class PulsarOperator { } // Configure retention policies - RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime, retentionSize); - namespaces.setRetention(namespaceName, retentionPolicies); + PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies(retentionTime, retentionSize); + policies.setRetentionPolicies(retentionPolicies); // Configure persistence policies - PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(), + PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies(pulsarInfo.getEnsemble(), pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate()); - namespaces.setPersistence(namespaceName, persistencePolicies); + policies.setPersistence(persistencePolicies); + + PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespaceName, policies); LOGGER.info("success to create namespace={}", namespaceName); - } catch (PulsarAdminException e) { + } catch (Exception e) { LOGGER.error("failed to create namespace=" + namespaceName, e); throw e; } @@ -156,7 +158,7 @@ public class PulsarOperator { /** * Create Pulsar topic */ - public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException { + public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception { Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty"); String tenant = topicInfo.getPulsarTenant(); String namespace = topicInfo.getNamespace(); @@ -164,33 +166,35 @@ public class PulsarOperator { String fullTopicName = tenant + "/" + namespace + "/" + topicName; // Topic will be returned if it exists, and created if it does not exist - if (topicExists(pulsarAdmin, tenant, namespace, topicName, + if (topicExists(pulsarClusterInfo, tenant, namespace, topicName, InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) { - LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarAdmin.getServiceUrl()); + LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarClusterInfo.getAdminUrl()); return; } try { if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicInfo.getQueueModule())) { - pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName); - String res = pulsarAdmin.lookups().lookupTopic(fullTopicName); + PulsarUtils.createNonPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName); + String res = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, fullTopicName); LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res); } else { // The number of brokers as the default value of topic partition - List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin); + List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo); Integer numPartitions = topicInfo.getNumPartitions(); if (numPartitions < 0 || numPartitions >= MAX_PARTITION) { - List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0)); + List<String> brokers = PulsarUtils.getBrokers(restTemplate, pulsarClusterInfo); numPartitions = brokers.size(); } - - pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions); - Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName); + PulsarUtils.createPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName, + numPartitions); + Map<String, String> res = PulsarUtils.lookupPartitionedTopic(restTemplate, + pulsarClusterInfo, fullTopicName); // if lookup failed (res.size not equals the partition number) - if (res.keySet().size() != numPartitions) { + if (res.size() != numPartitions) { // look up partition failed, retry to get partition numbers - for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) { - res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName); + for (int i = 0; (i < RETRY_TIMES && res.size() != numPartitions); i++) { + res = PulsarUtils.lookupPartitionedTopic(restTemplate, pulsarClusterInfo, + fullTopicName); try { Thread.sleep(500); } catch (InterruptedException e) { @@ -198,12 +202,12 @@ public class PulsarOperator { } } } - if (numPartitions != res.keySet().size()) { - throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic"); + if (numPartitions != res.size()) { + throw new Exception("The number of partitions not equal to lookupPartitionedTopic"); } LOGGER.info("success to create topic={}", fullTopicName); } - } catch (PulsarAdminException e) { + } catch (Exception e) { LOGGER.error("failed to create topic=" + fullTopicName, e); throw e; } @@ -212,25 +216,25 @@ public class PulsarOperator { /** * Force delete Pulsar topic */ - public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException { + public void forceDeleteTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception { Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty"); String tenant = topicInfo.getPulsarTenant(); String namespace = topicInfo.getNamespace(); String topic = topicInfo.getTopicName(); String fullTopicName = tenant + "/" + namespace + "/" + topic; + boolean isPartitioned = InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()); // Topic will be returned if it not exists - if (topicExists(pulsarAdmin, tenant, namespace, topic, - InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) { + if (topicExists(pulsarClusterInfo, tenant, namespace, topic, isPartitioned)) { LOGGER.warn("pulsar topic={} already delete", fullTopicName); return; } try { - pulsarAdmin.topics().delete(fullTopicName, true); + PulsarUtils.forceDeleteTopic(restTemplate, pulsarClusterInfo, fullTopicName, isPartitioned); LOGGER.info("success to delete topic={}", fullTopicName); - } catch (PulsarAdminException e) { + } catch (Exception e) { LOGGER.error("failed to delete topic=" + fullTopicName, e); throw e; } @@ -239,19 +243,20 @@ public class PulsarOperator { /** * Create a Pulsar subscription for the given topic */ - public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule, - String subscription) throws PulsarAdminException { + public void createSubscription(PulsarClusterInfo pulsarClusterInfo, String fullTopicName, String queueModule, + String subscription) throws Exception { LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, fullTopicName); try { - boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription, + boolean isExists = this.subscriptionExists(pulsarClusterInfo, fullTopicName, subscription, InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(queueModule)); if (isExists) { LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription); return; } - pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest); + + PulsarUtils.createSubscription(restTemplate, pulsarClusterInfo, fullTopicName, subscription); LOGGER.info("success to create subscription={}", subscription); - } catch (PulsarAdminException e) { + } catch (Exception e) { LOGGER.error("failed to create pulsar subscription=" + subscription, e); throw e; } @@ -260,31 +265,42 @@ public class PulsarOperator { /** * Create a Pulsar subscription for the specified topic list */ - public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo, - List<String> topicList) throws PulsarAdminException { + public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String subscription, PulsarTopicInfo topicInfo, + List<String> topicList) throws Exception { for (String topic : topicList) { topicInfo.setTopicName(topic); String fullTopicName = topicInfo.getPulsarTenant() + "/" + topicInfo.getNamespace() + "/" + topic; - this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription); + this.createSubscription(pulsarClusterInfo, fullTopicName, topicInfo.getQueueModule(), subscription); } LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList); } /** * Check if Pulsar tenant exists + * + * @param pulsarClusterInfo pulsar cluster info + * @param tenant pulsar tenant info + * @return true or false + * @throws Exception any exception if occurred */ - private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException { - List<String> tenantList = pulsarAdmin.tenants().getTenants(); - return tenantList.contains(tenant); + private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception { + List<String> tenants = PulsarUtils.getTenants(restTemplate, pulsarClusterInfo); + return tenants.contains(tenant); } /** - * Check whether the Pulsar namespace exists under the specified tenant + * Check whether the Pulsar namespace exists under the specified tenant. + * + * @param pulsarClusterInfo pulsar cluster info + * @param tenant pulsar tenant info + * @param namespace pulsar namespace info + * @return true or false + * @throws Exception any exception if occurred */ - private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace) - throws PulsarAdminException { - List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant); - return namespaceList.contains(tenant + "/" + namespace); + private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace) + throws Exception { + List<String> namespaces = PulsarUtils.getNamespaces(restTemplate, pulsarClusterInfo, tenant); + return namespaces.contains(namespace); } /** @@ -293,22 +309,23 @@ public class PulsarOperator { * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as: * Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist */ - public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName, + public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace, String topicName, boolean isPartitioned) { if (StringUtils.isBlank(topicName)) { return true; } // persistent://tenant/namespace/topic - List<String> topicList; + List<String> topics; boolean topicExists = false; try { if (isPartitioned) { - topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace); + topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo, tenant, + namespace); } else { - topicList = pulsarAdmin.topics().getList(tenant + "/" + namespace); + topics = PulsarUtils.getTopics(restTemplate, pulsarClusterInfo, tenant, namespace); } - for (String t : topicList) { + for (String t : topics) { t = t.substring(t.lastIndexOf("/") + 1); // not contains / if (!isPartitioned) { int suffixIndex = t.lastIndexOf("-partition-"); @@ -321,7 +338,7 @@ public class PulsarOperator { break; } } - } catch (PulsarAdminException pe) { + } catch (Exception pe) { LOGGER.error("check if the pulsar topic={} exists error, begin retry", topicName, pe); int count = 0; try { @@ -329,8 +346,9 @@ public class PulsarOperator { LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count); Thread.sleep(DELAY_SECONDS); - topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace); - for (String t : topicList) { + topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo, + tenant, namespace); + for (String t : topics) { t = t.substring(t.lastIndexOf("/") + 1); if (topicName.equals(t)) { topicExists = true; @@ -348,7 +366,7 @@ public class PulsarOperator { /** * Check whether the Pulsar topic exists. */ - private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription, + private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String topic, String subscription, boolean isPartitioned) { int count = 0; while (++count <= RETRY_TIMES) { @@ -358,22 +376,24 @@ public class PulsarOperator { // first lookup to load the topic, and then query whether the subscription exists if (isPartitioned) { - Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic); + Map<String, String> topicMap = PulsarUtils.lookupPartitionedTopic(restTemplate, + pulsarClusterInfo, topic); if (topicMap.isEmpty()) { LOGGER.error("result of lookups topic={} is empty, continue retry", topic); continue; } } else { - String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic); + String lookupTopic = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, topic); if (StringUtils.isBlank(lookupTopic)) { LOGGER.error("result of lookups topic={} is empty, continue retry", topic); continue; } } - List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic); + List<String> subscriptionList = PulsarUtils.getSubscriptions(restTemplate, + pulsarClusterInfo, topic); return subscriptionList.contains(subscription); - } catch (PulsarAdminException | InterruptedException e) { + } catch (Exception e) { LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e); if (count == RETRY_TIMES) { LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic); @@ -387,16 +407,17 @@ public class PulsarOperator { /** * Query topic message for the given pulsar cluster. */ - public List<BriefMQMessage> queryLatestMessage(PulsarAdmin pulsarAdmin, String topicFullName, String subName, + public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName, + String subName, Integer messageCount, InlongStreamInfo streamInfo, boolean serial) { LOGGER.info("begin to query message for topic {}, subName={}", topicFullName, subName); List<BriefMQMessage> messageList = new ArrayList<>(); - int partitionCount = getPartitionCount(pulsarAdmin, topicFullName); + int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName); for (int messageIndex = 0; messageIndex < messageCount; messageIndex++) { int currentPartitionNum = messageIndex % partitionCount; int messagePosition = messageIndex / partitionCount; String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial); - messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarAdmin, messageIndex, + messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex, streamInfo, messagePosition)); } LOGGER.info("success query message by subs={} for topic={}", subName, topicFullName); @@ -404,36 +425,39 @@ public class PulsarOperator { } /** - * Use pulsar admin to get topic partition count + * Get topic partition count. */ - private int getPartitionCount(PulsarAdmin pulsarAdmin, String topicFullName) { - PartitionedTopicMetadata partitionedTopicMetadata; + private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String topicFullName) { + PulsarTopicMetadata pulsarTopicMetadata; try { - partitionedTopicMetadata = pulsarAdmin.topics() - .getPartitionedTopicMetadata(topicFullName); + pulsarTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(restTemplate, + pulsarClusterInfo, topicFullName); } catch (Exception e) { String errMsg = "get pulsar partition error "; LOGGER.error(errMsg, e); throw new BusinessException(errMsg + e.getMessage()); } - return partitionedTopicMetadata.partitions > 0 ? partitionedTopicMetadata.partitions : 1; + return pulsarTopicMetadata.getPartitions() > 0 ? pulsarTopicMetadata.getPartitions() : 1; } /** - * Use pulsar admin to query message + * Query pulsar message. */ - private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarAdmin pulsarAdmin, int index, + private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarClusterInfo pulsarClusterInfo, + int index, InlongStreamInfo streamInfo, int messagePosition) { List<BriefMQMessage> briefMQMessages = new ArrayList<>(); try { - Message<byte[]> pulsarMessage = - pulsarAdmin.topics().examineMessage(topicPartition, "latest", messagePosition); - Map<String, String> headers = pulsarMessage.getProperties(); + ResponseEntity<byte[]> httpResponse = + PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPartition, "latest", + messagePosition); + PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition); + Map<String, String> headers = messageInfo.getProperties(); int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER, Integer.toString(MessageWrapType.INLONG_MSG_V0.getId()))); DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance( MessageWrapType.valueOf(wrapTypeId)); - briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, pulsarMessage.getData(), + briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(), headers, index)); } catch (Exception e) { LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}", diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java index 4d4e7051e2..c9d7b4a240 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java @@ -44,12 +44,9 @@ import com.google.common.base.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.List; /** @@ -101,7 +98,7 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); for (ClusterInfo clusterInfo : clusterInfos) { PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { + try { // create pulsar tenant and namespace if (StringUtils.isBlank(tenant)) { tenant = pulsarCluster.getPulsarTenant(); @@ -109,9 +106,9 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { // if the group was not successful, need create tenant and namespace if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) { - pulsarOperator.createTenant(pulsarAdmin, tenant); + pulsarOperator.createTenant(pulsarCluster, tenant); String namespace = groupInfo.getMqResource(); - pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace); + pulsarOperator.createNamespace(pulsarCluster, pulsarInfo, tenant, namespace); log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}", groupId, tenant, namespace, pulsarCluster); @@ -223,21 +220,19 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { */ private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception { - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarInfo.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = pulsarCluster.getPulsarTenant(); - } - String namespace = pulsarInfo.getMqResource(); - PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() - .pulsarTenant(tenant) - .namespace(namespace) - .topicName(topicName) - .queueModule(pulsarInfo.getQueueModule()) - .numPartitions(pulsarInfo.getPartitionNum()) - .build(); - pulsarOperator.createTopic(pulsarAdmin, topicInfo); + String tenant = pulsarInfo.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); } + String namespace = pulsarInfo.getMqResource(); + PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() + .pulsarTenant(tenant) + .namespace(namespace) + .topicName(topicName) + .queueModule(pulsarInfo.getQueueModule()) + .numPartitions(pulsarInfo.getPartitionNum()) + .build(); + pulsarOperator.createTopic(pulsarCluster, topicInfo); } /** @@ -245,41 +240,39 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { */ private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName, String streamId) throws Exception { - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarInfo.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = pulsarCluster.getPulsarTenant(); - } - String namespace = pulsarInfo.getMqResource(); - String fullTopicName = tenant + "/" + namespace + "/" + topicName; - boolean exist = pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName, - InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule())); - if (!exist) { - String serviceUrl = pulsarCluster.getAdminUrl(); - log.error("topic={} not exists in {}", fullTopicName, serviceUrl); - throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl); - } + String tenant = pulsarInfo.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); + } + String namespace = pulsarInfo.getMqResource(); + String fullTopicName = tenant + "/" + namespace + "/" + topicName; + boolean exist = pulsarOperator.topicExists(pulsarCluster, tenant, namespace, topicName, + InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule())); + if (!exist) { + String serviceUrl = pulsarCluster.getAdminUrl(); + log.error("topic={} not exists in {}", fullTopicName, serviceUrl); + throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl); + } - // create subscription for all sinks - String groupId = pulsarInfo.getInlongGroupId(); - List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId); - if (CollectionUtils.isEmpty(streamSinks)) { - log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId); - return; - } + // create subscription for all sinks + String groupId = pulsarInfo.getInlongGroupId(); + List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId); + if (CollectionUtils.isEmpty(streamSinks)) { + log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId); + return; + } - // subscription naming rules: clusterTag_topicName_sinkId_consumer_group - String clusterTag = pulsarInfo.getInlongClusterTag(); - for (StreamSink sink : streamSinks) { - String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId()); - pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs); - log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName); - - // insert the consumer group info into the inlong_consume table - Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs); - log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", - id, subs, groupId, topicName); - } + // subscription naming rules: clusterTag_topicName_sinkId_consumer_group + String clusterTag = pulsarInfo.getInlongClusterTag(); + for (StreamSink sink : streamSinks) { + String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId()); + pulsarOperator.createSubscription(pulsarCluster, fullTopicName, pulsarInfo.getQueueModule(), subs); + log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName); + + // insert the consumer group info into the inlong_consume table + Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs); + log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", + id, subs, groupId, topicName); } } @@ -289,54 +282,46 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { */ private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName) throws Exception { - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = pulsarInfo.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = pulsarCluster.getPulsarTenant(); - } - String namespace = pulsarInfo.getMqResource(); - PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() - .pulsarTenant(tenant) - .namespace(namespace) - .topicName(topicName) - .build(); - pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo); + String tenant = pulsarInfo.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); } + String namespace = pulsarInfo.getMqResource(); + PulsarTopicInfo topicInfo = PulsarTopicInfo.builder() + .pulsarTenant(tenant) + .namespace(namespace) + .topicName(topicName) + .build(); + pulsarOperator.forceDeleteTopic(pulsarCluster, topicInfo); } /** * Query latest message from pulsar */ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, - InlongStreamInfo streamInfo, Integer messageCount) throws PulsarClientException { + InlongStreamInfo streamInfo, Integer messageCount) throws Exception { String groupId = streamInfo.getInlongGroupId(); InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo); PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR); - List<BriefMQMessage> briefMQMessages = new ArrayList<>(); - - try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) { - String tenant = inlongPulsarInfo.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = pulsarCluster.getPulsarTenant(); - } - - String namespace = groupInfo.getMqResource(); - String topicName = streamInfo.getMqResource(); - String fullTopicName = tenant + "/" + namespace + "/" + topicName; - String clusterTag = inlongPulsarInfo.getInlongClusterTag(); - String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName); - boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule()); - briefMQMessages = - pulsarOperator.queryLatestMessage(pulsarAdmin, fullTopicName, subs, messageCount, streamInfo, - serial); - - // insert the consumer group info into the inlong_consume table - Integer id = consumeService.saveBySystem(groupInfo, topicName, subs); - log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", - id, subs, groupId, topicName); + String tenant = inlongPulsarInfo.getPulsarTenant(); + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); } + + String namespace = groupInfo.getMqResource(); + String topicName = streamInfo.getMqResource(); + String fullTopicName = tenant + "/" + namespace + "/" + topicName; + String clusterTag = inlongPulsarInfo.getInlongClusterTag(); + String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName); + boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule()); + List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs, + messageCount, streamInfo, serial); + + // insert the consumer group info into the inlong_consume table + Integer id = consumeService.saveBySystem(groupInfo, topicName, subs); + log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}", + id, subs, groupId, topicName); return briefMQMessages; } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java index 81c3de2b6a..9fd81aac03 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java @@ -17,18 +17,45 @@ package org.apache.inlong.manager.service.resource.queue.pulsar; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; -import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.util.HttpUtils; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageMetadata; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.api.PulsarClientException; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; /** * Pulsar connection utils @@ -39,54 +66,921 @@ public class PulsarUtils { private PulsarUtils() { } + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone( + ZoneId.systemDefault()); + + public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters"; + public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers"; + public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants"; + public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces"; + public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent"; + public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic"; + + private static final Gson GSON = new GsonBuilder().create(); // thread safe + + /** + * Get http headers by token. + * + * @param token pulsar token info + * @return add http headers for token info + */ + private static HttpHeaders getHttpHeaders(String token) { + HttpHeaders headers = new HttpHeaders(); + if (StringUtils.isNotEmpty(token)) { + headers.add("Authorization", "Bearer " + token); + } + return headers; + } + + /** + * Get pulsar cluster info list. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @return list of pulsar cluster infos + * @throws Exception any exception if occurred + */ + public static List<String> getClusters(RestTemplate restTemplate, PulsarClusterInfo clusterInfo) + throws Exception { + final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + /** - * Get pulsar admin info + * Get the list of active brokers. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @return list of pulsar broker infos + * @throws Exception any exception if occurred + */ + public static List<String> getBrokers(RestTemplate restTemplate, PulsarClusterInfo clusterInfo) + throws Exception { + List<String> clusters = getClusters(restTemplate, clusterInfo); + List<String> brokers = new ArrayList<>(); + for (String brokerName : clusters) { + String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" + brokerName; + brokers.addAll( + HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class)); + } + return brokers; + } + + /** + * Get pulsar tenant info list. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @return list of pulsar tenant infos + * @throws Exception any exception if occurred */ - public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) throws PulsarClientException { - Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), ErrorCodeEnum.INVALID_PARAMETER, - "Pulsar adminUrl cannot be empty"); - PulsarAdmin pulsarAdmin; - if (StringUtils.isEmpty(pulsarCluster.getToken())) { - pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl()); + public static List<String> getTenants(RestTemplate restTemplate, PulsarClusterInfo clusterInfo) + throws Exception { + final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + + /** + * Get pulsar namespace info list. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param tenant pulsar tenant name + * @return list of pulsar namespace infos + * @throws Exception any exception if occurred + */ + public static List<String> getNamespaces(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String tenant) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + tenant; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + + /** + * Create a new pulsar tenant. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param tenant pulsar tenant name + * @param tenantInfo pulsar tenant info + * @throws Exception any exception if occurred + */ + public static void createTenant(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant, + PulsarTenantInfo tenantInfo) throws Exception { + final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + "/" + tenant; + HttpHeaders headers = getHttpHeaders(clusterInfo.getToken()); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + String param = GSON.toJson(tenantInfo); + HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers); + } + + /** + * Creates a new pulsar namespace with the specified policies. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param tenant pulsar namespace name + * @param namespaceName pulsar namespace name + * @param policies pulsar namespace policies info + * @throws Exception any exception if occurred + */ + public static void createNamespace(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant, + String namespaceName, PulsarNamespacePolicies policies) throws Exception { + final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant + + InlongConstants.SLASH + namespaceName; + HttpHeaders headers = getHttpHeaders(clusterInfo.getToken()); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + String param = GSON.toJson(policies); + param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds") + .replaceAll("retentionPolicies", "retention_policies"); + HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers); + } + + /** + * Get the list of topics under a namespace. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param tenant pulsar tenant name + * @param namespace pulsar namespace name + * @return list of pulsar topic infos + * @throws Exception any exception if occurred + */ + public static List<String> getTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant, + String namespace) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + + /** + * Get the list of partitioned topics under a namespace. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param tenant pulsar tenant name + * @param namespace pulsar namespace name + * @return list of pulsar partitioned topic infos + * @throws Exception any exception if occurred + */ + public static List<String> getPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String tenant, String namespace) throws Exception { + String url = + clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace + "/partitioned"; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + + /** + * Create a non-partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void createNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath; + HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Create a partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void createPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath, Integer numPartitions) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions"; + HttpUtils.request(restTemplate, url, HttpMethod.PUT, numPartitions.toString(), + getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Get the stats-internal for the partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @return pulsar internal stat info of partitioned topic + * @throws Exception any exception if occurred + */ + public static JsonObject getInternalStatsPartitionedTopics(RestTemplate restTemplate, + PulsarClusterInfo clusterInfo, String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitioned-internalStats"; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + JsonObject.class); + } + + /** + * Get partitioned topic metadata. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @return pulsar topic metadata info + * @throws Exception any exception if occurred + */ + public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate restTemplate, + PulsarClusterInfo clusterInfo, String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions"; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + PulsarTopicMetadata.class); + } + + /** + * Delete a topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void deleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath; + HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Force delete a topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void forceDeleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath; + Map<String, Boolean> uriVariables = new HashMap<>(); + uriVariables.put("force", true); + HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Delete a partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void deletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions"; + HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Force delete a partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @throws Exception any exception if occurred + */ + public static void forceDeletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions"; + Map<String, Boolean> uriVariables = new HashMap<>(); + uriVariables.put("force", true); + HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken())); + } + + /** + * Delete a partitioned or non-partitioned topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @param isPartitioned pulsar is partitioned topic + * @throws Exception any exception if occurred + */ + public static void deleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath, + boolean isPartitioned) throws Exception { + if (isPartitioned) { + deletePartitionedTopic(restTemplate, clusterInfo, topicPath); } else { - pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), pulsarCluster.getToken()); + deleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath); } - return pulsarAdmin; } /** - * Get the pulsar admin from the given service URL. + * Force delete a partitioned or non-partitioned topic. * - * @apiNote It must be closed after use. + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @param isPartitioned pulsar is partitioned topic + * @throws Exception any exception if occurred */ - public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException { - return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build(); + public static void forceDeleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath, + boolean isPartitioned) + throws Exception { + if (isPartitioned) { + forceDeletePartitionedTopic(restTemplate, clusterInfo, topicPath); + } else { + forceDeleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath); + } } /** - * Get the pulsar admin from the given service URL and token. - * <p/> - * Currently only token is supported as an authentication type. + * lookup persistent topic info. * - * @apiNote It must be closed after use. + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @return pulsar broker url + * @throws Exception any exception if occurred */ - public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String token) throws PulsarClientException { - return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl) - .authentication(AuthenticationFactory.token(token)).build(); + public static String lookupTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath) + throws Exception { + String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + topicPath; + PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, HttpMethod.GET, null, + getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class); + return topicInfo.getBrokerUrl(); } /** - * Get pulsar cluster info list. + * lookup persistent partitioned topic info. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @return map of partitioned topic info + * @throws Exception any exception if occurred + */ + public static Map<String, String> lookupPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + PulsarTopicMetadata metadata = getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath); + Map<String, String> map = new LinkedHashMap<>(); + for (int i = 0; i < metadata.getPartitions(); i++) { + String partitionTopicName = topicPath + "-partition-" + i; + String partitionUrl = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName; + PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, partitionUrl, HttpMethod.GET, null, + getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class); + map.put(partitionTopicName, topicInfo.getBrokerUrl()); + } + return map; + } + + /** + * Get the list of persistent subscriptions for a given topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @return list of pulsar topic subscription info + * @throws Exception any exception if occurred + */ + public static List<String> getSubscriptions(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPath) throws Exception { + String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscriptions"; + return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()), + ArrayList.class); + } + + /** + * Create a subscription on the topic. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPath pulsar topic path + * @param subscription pulsar topic subscription info + * @throws Exception any exception if occurred + */ + public static void createSubscription(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath, + String subscription) throws Exception { + String url = + clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscription/" + subscription; + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("entryId", Long.MAX_VALUE); + jsonObject.addProperty("ledgerId", Long.MAX_VALUE); + jsonObject.addProperty("partitionIndex", -1); + HttpHeaders headers = getHttpHeaders(clusterInfo.getToken()); + MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + HttpUtils.request(restTemplate, url, HttpMethod.PUT, jsonObject.toString(), headers); + } + + /** + * Examine a specific message on a topic by position relative to the earliest or the latest message. + * + * @param restTemplate spring framework RestTemplate + * @param clusterInfo pulsar cluster info + * @param topicPartition pulsar topic partition info + * @param messageType pulsar message type info + * @param messagePosition pulsar message position info + * @return spring framework HttpEntity + * @throws Exception any exception if occurred + */ + public static ResponseEntity<byte[]> examineMessage(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, + String topicPartition, String messageType, int messagePosition) throws Exception { + StringBuilder urlBuilder = new StringBuilder().append(clusterInfo.getAdminUrl()) + .append(QUERY_PERSISTENT_PATH) + .append("/") + .append(topicPartition) + .append("/examinemessage") + .append("?initialPosition=") + .append(messageType) + .append("&messagePosition=") + .append(messagePosition); + ResponseEntity<byte[]> response = restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET, + new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), byte[].class); + if (!response.getStatusCode().is2xxSuccessful()) { + log.error("request error for {}, status code {}, body {}", urlBuilder.toString(), response.getStatusCode(), + response.getBody()); + } + return response; + } + + public static PulsarMessageInfo getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic) + throws Exception { + List<PulsarMessageInfo> messages = PulsarUtils.getMessagesFromHttpResponse(response, topic); + if (messages.size() > 0) { + return messages.get(0); + } else { + return null; + } + } + + /** + * Copy from getMessagesFromHttpResponse method of org.apache.pulsar.client.admin.internal.TopicsImpl class. + * + * @param response + * @param topic + * @return + * @throws Exception */ - public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) throws PulsarAdminException { - return pulsarAdmin.clusters().getClusters(); + public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic) + throws Exception { + HttpHeaders headers = response.getHeaders(); + String msgId = headers.getFirst("X-Pulsar-Message-ID"); + String brokerEntryTimestamp = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp"); + String brokerEntryIndex = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index"); + PulsarBrokerEntryMetadata brokerEntryMetadata; + if (brokerEntryTimestamp == null && brokerEntryIndex == null) { + brokerEntryMetadata = null; + } else { + brokerEntryMetadata = new PulsarBrokerEntryMetadata(); + if (brokerEntryTimestamp != null) { + brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString())); + } + if (brokerEntryIndex != null) { + brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex)); + } + } + + PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata(); + Map<String, String> properties = Maps.newTreeMap(); + + Object tmp = headers.getFirst("X-Pulsar-publish-time"); + if (tmp != null) { + messageMetadata.setPublishTime(parse(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-event-time"); + if (tmp != null) { + messageMetadata.setEventTime(parse(tmp.toString())); + } + tmp = headers.getFirst("X-Pulsar-deliver-at-time"); + if (tmp != null) { + messageMetadata.setDeliverAtTime(parse(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-null-value"); + if (tmp != null) { + messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-producer-name"); + if (tmp != null) { + messageMetadata.setProducerName(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-sequence-id"); + if (tmp != null) { + messageMetadata.setSequenceId(Long.parseLong(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-replicated-from"); + if (tmp != null) { + messageMetadata.setReplicatedFrom(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-partition-key"); + if (tmp != null) { + messageMetadata.setPartitionKey(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-compression"); + if (tmp != null) { + messageMetadata.setCompression(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-uncompressed-size"); + if (tmp != null) { + messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-encryption-algo"); + if (tmp != null) { + messageMetadata.setEncryptionAlgo(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded"); + if (tmp != null) { + messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-marker-type"); + if (tmp != null) { + messageMetadata.setMarkerType(Integer.parseInt(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-txnid-least-bits"); + if (tmp != null) { + messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-txnid-most-bits"); + if (tmp != null) { + messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-highest-sequence-id"); + if (tmp != null) { + messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-uuid"); + if (tmp != null) { + messageMetadata.setUuid(tmp.toString()); + } + + tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg"); + if (tmp != null) { + messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size"); + if (tmp != null) { + messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-chunk-id"); + if (tmp != null) { + messageMetadata.setChunkId(Integer.parseInt(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-null-partition-key"); + if (tmp != null) { + messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-Base64-encryption-param"); + if (tmp != null) { + messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-Base64-ordering-key"); + if (tmp != null) { + messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded"); + if (tmp != null) { + messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString())); + } + + tmp = headers.getFirst("X-Pulsar-Base64-encryption-param"); + if (tmp != null) { + messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString())); + } + + List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to"); + if (ObjectUtils.isNotEmpty(tmpList)) { + if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) { + messageMetadata.setReplicateTos(Lists.newArrayList(tmpList)); + } else { + messageMetadata.getReplicateTos().addAll(tmpList); + } + } + + tmp = headers.getFirst("X-Pulsar-batch-size"); + if (tmp != null) { + properties.put("X-Pulsar-batch-size", (String) tmp); + } + + for (Entry<String, List<String>> entry : headers.entrySet()) { + if (entry.getKey().contains("X-Pulsar-PROPERTY-")) { + String keyName = entry.getKey().substring("X-Pulsar-PROPERTY-".length()); + properties.put(keyName, (String) ((List) entry.getValue()).get(0)); + } + } + + tmp = headers.getFirst("X-Pulsar-num-batch-message"); + if (tmp != null) { + properties.put("X-Pulsar-num-batch-message", (String) tmp); + } + boolean isEncrypted = false; + tmp = headers.getFirst("X-Pulsar-Is-Encrypted"); + if (tmp != null) { + isEncrypted = Boolean.parseBoolean(tmp.toString()); + } + + if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) { + return getIndividualMsgsFromBatch(topic, msgId, response.getBody(), properties, messageMetadata, + brokerEntryMetadata); + } + + PulsarMessageInfo messageInfo = new PulsarMessageInfo(); + messageInfo.setTopic(topic); + messageInfo.setMessageId(msgId); + messageInfo.setProperties(messageMetadata.getProperties()); + messageInfo.setBody(response.getBody()); + messageInfo.setPulsarMessageMetadata(messageMetadata); + if (brokerEntryMetadata != null) { + messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata); + } + return Collections.singletonList(messageInfo); + } + + private static long parse(String datetime) throws DateTimeParseException { + Instant instant = Instant.from(DATE_FORMAT.parse(datetime)); + return instant.toEpochMilli(); + } + + /** + * Copy from getIndividualMsgsFromBatch method of org.apache.pulsar.client.admin.internal.TopicsImpl class. + * + * @param topic + * @param msgId + * @param data + * @param properties + * @param metadata + * @param brokerMetadata + * @return + */ + private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, + Map<String, String> properties, PulsarMessageMetadata metadata, PulsarBrokerEntryMetadata brokerMetadata) { + List<PulsarMessageInfo> ret = new ArrayList<>(); + int batchSize = Integer.parseInt(properties.get("X-Pulsar-num-batch-message")); + ByteBuffer buffer = ByteBuffer.wrap(data); + for (int i = 0; i < batchSize; ++i) { + String batchMsgId = msgId + ":" + i; + PulsarMessageMetadata singleMetadata = new PulsarMessageMetadata(); + singleMetadata.setProperties(properties); + ByteBuffer singleMessagePayload = deSerializeSingleMessageInBatch(buffer, singleMetadata, i, batchSize); + PulsarMessageInfo messageInfo = new PulsarMessageInfo(); + messageInfo.setTopic(topic); + messageInfo.setMessageId(batchMsgId); + messageInfo.setProperties(singleMetadata.getProperties()); + messageInfo.setPulsarMessageMetadata(metadata); + messageInfo.setBody(singleMessagePayload.array()); + if (brokerMetadata != null) { + messageInfo.setPulsarBrokerEntryMetadata(brokerMetadata); + } + ret.add(messageInfo); + } + buffer.clear(); + return ret; + } + + /** + * Copy from deSerializeSingleMessageInBatch method of org.apache.pulsar.common.protocol.Commands class. + * + * @param uncompressedPayload + * @param metadata + * @param index + * @param batchSize + * @return + */ + private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer uncompressedPayload, + PulsarMessageMetadata metadata, int index, int batchSize) { + int singleMetaSize = (int) uncompressedPayload.getInt(); + metaDataParseFrom(metadata, uncompressedPayload, singleMetaSize); + int singleMessagePayloadSize = metadata.getPayloadSize(); + int readerIndex = uncompressedPayload.position(); + byte[] singleMessagePayload = new byte[singleMessagePayloadSize]; + uncompressedPayload.get(singleMessagePayload); + if (index < batchSize) { + uncompressedPayload.position(readerIndex + singleMessagePayloadSize); + } + return ByteBuffer.wrap(singleMessagePayload); + } + + /** + * Copy from parseFrom method of org.apache.pulsar.common.api.proto.SingleMessageMetadata class. + * + * @param metadata + * @param buffer + * @param size + */ + private static void metaDataParseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) { + int endIdx = size + buffer.position(); + while (buffer.position() < endIdx) { + int tag = readVarInt(buffer); + switch (tag) { + case 10: + int _propertiesSize = readVarInt(buffer); + parseFrom(metadata, buffer, _propertiesSize); + break; + case 18: + int _partitionKeyBufferLen = readVarInt(buffer); + byte[] partitionKeyArray = new byte[_partitionKeyBufferLen]; + buffer.get(partitionKeyArray); + metadata.setPartitionKey(new String(partitionKeyArray)); + break; + case 24: + int payloadSize = readVarInt(buffer); + metadata.setPayloadSize(payloadSize); + break; + case 32: + boolean compactedOut = readVarInt(buffer) == 1; + metadata.setCompactedOut(compactedOut); + break; + case 40: + long eventTime = readVarInt64(buffer); + metadata.setEventTime(eventTime); + break; + case 48: + boolean partitionKeyB64Encoded = readVarInt(buffer) == 1; + metadata.setPartitionKeyB64Encoded(partitionKeyB64Encoded); + break; + case 58: + int _orderingKeyLen = readVarInt(buffer); + byte[] orderingKeyArray = new byte[_orderingKeyLen]; + metadata.setOrderingKey(orderingKeyArray); + break; + case 64: + long sequenceId = readVarInt64(buffer); + metadata.setSequenceId(sequenceId); + break; + case 72: + boolean nullValue = readVarInt(buffer) == 1; + metadata.setNullValue(nullValue); + break; + case 80: + boolean nullPartitionKey = readVarInt(buffer) == 1; + metadata.setNullPartitionKey(nullPartitionKey); + break; + default: + skipUnknownField(tag, buffer); + } + } } /** - * Get pulsar cluster service url. + * Copy from readVarInt method of org.apache.pulsar.common.api.proto.LightProtoCodec class. + * + * @param buf + * @return */ - public static String getServiceUrl(PulsarAdmin pulsarAdmin, String pulsarCluster) throws PulsarAdminException { - return pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl(); + private static int readVarInt(ByteBuffer buf) { + byte tmp = buf.get(); + if (tmp >= 0) { + return tmp; + } else { + int result = tmp & 127; + if ((tmp = buf.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 127) << 7; + if ((tmp = buf.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 127) << 14; + if ((tmp = buf.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 127) << 21; + result |= (tmp = buf.get()) << 28; + if (tmp < 0) { + for (int i = 0; i < 5; ++i) { + if (buf.get() >= 0) { + return result; + } + } + throw new IllegalArgumentException("Encountered a malformed varint."); + } + } + } + } + return result; + } } + /** + * Copy from readVarInt64 method of org.apache.pulsar.common.api.proto.LightProtoCodec class. + * + * @param buf + * @return + */ + private static long readVarInt64(ByteBuffer buf) { + int shift = 0; + for (long result = 0L; shift < 64; shift += 7) { + byte b = buf.get(); + result |= (long) (b & 127) << shift; + if ((b & 128) == 0) { + return result; + } + } + throw new IllegalArgumentException("Encountered a malformed varint."); + } + + /** + * Copy from getTagType method of org.apache.pulsar.common.api.proto.LightProtoCodec class. + * + * @param tag + * @return + */ + private static int getTagType(int tag) { + return tag & 7; + } + + /** + * Copy from skipUnknownField method of org.apache.pulsar.common.api.proto.LightProtoCodec class. + * + * @param tag + * @param buffer + */ + private static void skipUnknownField(int tag, ByteBuffer buffer) { + int tagType = getTagType(tag); + switch (tagType) { + case 0: + readVarInt(buffer); + break; + case 1: + buffer.get(new byte[8]); + break; + case 2: + int len = readVarInt(buffer); + buffer.get(new byte[len]); + break; + case 3: + case 4: + default: + throw new IllegalArgumentException("Invalid unknonwn tag type: " + tagType); + case 5: + buffer.get(new byte[4]); + } + } + + /** + * Copy from parseFrom method of org.apache.pulsar.common.api.proto.KeyValue class. + * + * @param metadata + * @param buffer + * @param size + */ + private static void parseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) { + if (ObjectUtils.isEmpty(metadata.getProperties())) { + metadata.setProperties(new HashMap<>()); + } + Map<String, String> properties = metadata.getProperties(); + int endIdx = buffer.position() + size; + String key = null; + String value = null; + while (buffer.position() < endIdx) { + int tag = readVarInt(buffer); + if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) { + properties.put(key, value); + key = null; + value = null; + } + switch (tag) { + case 10: + int keyBufferLen = readVarInt(buffer); + byte[] keyArray = new byte[keyBufferLen]; + buffer.get(keyArray); + key = new String(keyArray); + break; + case 18: + int valueBufferLen = readVarInt(buffer); + byte[] valueArray = new byte[valueBufferLen]; + buffer.get(valueArray); + value = new String(valueArray); + break; + default: + skipUnknownField(tag, buffer); + } + } + if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) { + properties.put(key, value); + } + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java index a38b391131..f790604fc8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java @@ -33,13 +33,9 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO; import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator; -import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -86,12 +82,11 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl()) .token(pulsarDataNodeInfo.getToken()).build(); - PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo); // create pulsar tenant - pulsarOperator.createTenant(pulsarAdmin, pulsarSinkDTO.getPulsarTenant()); + pulsarOperator.createTenant(pulsarClusterInfo, pulsarSinkDTO.getPulsarTenant()); // use default config to create namespace InlongPulsarInfo pulsarInfo = new InlongPulsarInfo(); - pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, pulsarSinkDTO.getPulsarTenant(), + pulsarOperator.createNamespace(pulsarClusterInfo, pulsarInfo, pulsarSinkDTO.getPulsarTenant(), pulsarSinkDTO.getNamespace()); String queueModel = pulsarSinkDTO.getPartitionNum() > 0 ? InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL : InlongConstants.PULSAR_QUEUE_TYPE_SERIAL; @@ -102,15 +97,14 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat .queueModule(queueModel) .build(); // create topic - pulsarOperator.createTopic(pulsarAdmin, topicInfo); + pulsarOperator.createTopic(pulsarClusterInfo, topicInfo); final String info = "success to create Pulsar resource"; sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); LOG.info(info + " for sinkInfo={}", sinkInfo); - } catch (PulsarClientException | PulsarAdminException e) { + } catch (Exception e) { LOG.error("init pulsar admin error", e); throw new BusinessException(); } - } private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java index d248286820..0676b944f8 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java @@ -17,39 +17,578 @@ package org.apache.inlong.manager.service.queue; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo; +import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata; import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; -import org.junit.jupiter.api.Assertions; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import org.apache.commons.lang3.StringUtils; +import org.junit.Ignore; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.util.ReflectionUtils; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; -import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.powermock.api.mockito.PowerMockito.when; /** * Test class for Pulsar utils. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class PulsarUtilsTest { + private static final Logger LOG = LoggerFactory.getLogger(PulsarUtilsTest.class); + + public static final Network NETWORK = Network.newNetwork(); + + private static final String INTER_CONTAINER_PULSAR_ALIAS = "pulsar"; + + private static final Gson GSON = new GsonBuilder().create(); // thread safe + + private static final String DEFAULT_SERVICE_URL = "http://127.0.0.1:8080"; + + private static final String DEFAULT_TENANT = "public"; + + private static final String DEFAULT_CREATE_TENANT = "test_tenant"; + + private static final String DEFAULT_NAMESPACE = "default"; + + private static final int DEFAULT_PARTITIONS_NUM = 3; + + private static final String PERSISTENT_TOPIC_HEAD = "persistent://"; + + @Mock + private RestTemplate restTemplate; + + @Mock + private ResponseEntity<byte[]> byteExchange; + + @Mock + private PulsarClusterInfo pulsarClusterInfo; + + private static PulsarNamespacePolicies policies; + + private static final PulsarContainer PULSAT_CONTAINER = new PulsarContainer( + DockerImageName.parse("apachepulsar/pulsar:2.8.2") + .asCompatibleSubstituteFor("apachepulsar/pulsar")) + .withNetwork(NETWORK) + .withAccessToHost(true) + .withNetworkAliases(INTER_CONTAINER_PULSAR_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + private static final RestTemplate client = new RestTemplate(); + + private static final PulsarClusterInfo pulsarCluster = new PulsarClusterInfo(); + + @BeforeAll + public static void beforeAll() { + policies = new PulsarNamespacePolicies(); + policies.setMessageTtlInSeconds(10); + PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies(); + persistencePolicies.setBookkeeperEnsemble(3); + persistencePolicies.setBookkeeperAckQuorum(3); + persistencePolicies.setBookkeeperWriteQuorum(3); + persistencePolicies.setManagedLedgerMaxMarkDeleteRate(4.0); + policies.setPersistence(persistencePolicies); + PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies(); + retentionPolicies.setRetentionSizeInMB(2048l); + retentionPolicies.setRetentionTimeInMinutes(500); + policies.setRetentionPolicies(retentionPolicies); + + PULSAT_CONTAINER.setPortBindings(Arrays.asList("6650:6650", "8080:8080")); + Startables.deepStart(Stream.of(PULSAT_CONTAINER)).join(); + LOG.info("Containers are started."); + pulsarCluster.setAdminUrl("http://127.0.0.1:8080"); + } + + @AfterAll + public static void teardown() { + if (PULSAT_CONTAINER != null) { + PULSAT_CONTAINER.stop(); + } + } + + @BeforeEach + public void before() { + when(pulsarClusterInfo.getAdminUrl()).thenReturn(DEFAULT_SERVICE_URL); + when(pulsarClusterInfo.getToken()).thenReturn("testtoken"); + + } + /** + * Test cases for {@link PulsarUtils#getClusters(RestTemplate, PulsarClusterInfo)}. + * + * @throws Exception + */ @Test - public void testGetPulsarAdmin() { - final String defaultServiceUrl = "http://127.0.0.1:10080"; - try { - PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl); - Assertions.assertEquals(defaultServiceUrl, admin.getServiceUrl()); - Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth"); - assert auth != null; - auth.setAccessible(true); - Authentication authentication = (Authentication) auth.get(admin); - Assertions.assertNotNull(authentication); - Assertions.assertTrue(authentication instanceof AuthenticationDisabled); - } catch (PulsarClientException | IllegalAccessException e) { - Assertions.fail(); + public void testGetClusters() throws Exception { + final String result = "[\"standalone\"]"; + List<String> expected = GSON.fromJson(result, ArrayList.class); + List<String> clusters = PulsarUtils.getClusters(client, pulsarCluster); + assertEquals(expected.size(), clusters.size()); + assertEquals(expected, clusters); + } + + /** + * Test cases for {@link PulsarUtils#getBrokers(RestTemplate, PulsarClusterInfo)}. + * + * @throws Exception + */ + @Test + public void testGetBrokers() throws Exception { + final String result = "[\"localhost:8080\"]"; + List<String> expected = GSON.fromJson(result, ArrayList.class); + List<String> brokers = PulsarUtils.getBrokers(client, pulsarCluster); + assertEquals(expected.size(), brokers.size()); + assertEquals(expected, brokers); + } + + /** + * Test cases for {@link PulsarUtils#getTenants(RestTemplate, PulsarClusterInfo)}. + * + * @throws Exception + */ + @Test + public void testGetTenants() throws Exception { + List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster); + assertNotNull(tenants); + } + + /** + * Test cases for {@link PulsarUtils#getNamespaces(RestTemplate, PulsarClusterInfo, String)}. + * + * @throws Exception + */ + @Test + public void testGetNamespaces() throws Exception { + List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT); + assertNotNull(namespaces); + } + + /** + * Test cases for {@link PulsarUtils#createTenant}. + * + * @throws Exception + */ + @Test + public void testCreateTenant() throws Exception { + PulsarTenantInfo pulsarTenantInfo = new PulsarTenantInfo(); + pulsarTenantInfo.setAdminRoles(Sets.newHashSet()); + pulsarTenantInfo.setAllowedClusters(Sets.newHashSet("standalone")); + PulsarUtils.createTenant(client, pulsarCluster, DEFAULT_CREATE_TENANT, pulsarTenantInfo); + Thread.sleep(500); + List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster); + assertTrue(tenants.contains(DEFAULT_CREATE_TENANT)); + } + + /** + * Test cases for {@link PulsarUtils#createNamespace(RestTemplate, PulsarClusterInfo, String, String, PulsarNamespacePolicies)}. + * + * @throws Exception + */ + @Test + public void testCreateNamespace() throws Exception { + final String namespaceName = "testCreateNamespace"; + final String namespaceInfo = DEFAULT_TENANT + InlongConstants.SLASH + namespaceName; + String param = GSON.toJson(policies); + param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds") + .replaceAll("retentionPolicies", "retention_policies"); + final HttpHeaders headers = new HttpHeaders(); + if (StringUtils.isNotEmpty(pulsarClusterInfo.getToken())) { + headers.add("Authorization", "Bearer " + pulsarClusterInfo.getToken()); } + final MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); + headers.setContentType(type); + headers.add("Accept", MediaType.APPLICATION_JSON.toString()); + + PulsarUtils.createNamespace(client, pulsarCluster, DEFAULT_TENANT, namespaceName, policies); + Thread.sleep(500); + List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT); + assertTrue(namespaces.contains(namespaceInfo)); + } + + /** + * Test cases for {@link PulsarUtils#getTopics(RestTemplate, PulsarClusterInfo, String, String)}. + * + * @throws Exception + */ + @Test + public void testGetTopics() throws Exception { + List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.size() >= 0); + } + + /** + * Test cases for {@link PulsarUtils#getPartitionedTopics(RestTemplate, PulsarClusterInfo, String, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getList</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/partitioned + * method is: GET + * request body: none + * response : ["string"] + * + * @throws Exception + */ + @Test + public void testGetPartitionedTopics() throws Exception { + List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.size() >= 0); + } + + /** + * Test cases for {@link PulsarUtils#createNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * + * @throws Exception + */ + @Test + public void testCreateNonPartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testCreateNonPartitionedTopic"; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + Thread.sleep(500); + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + } + + /** + * Test cases for {@link PulsarUtils#createPartitionedTopic(RestTemplate, PulsarClusterInfo, String, Integer)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createPartitionedTopic</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions + * method is: PUT + * request body: integer <int32> (The number of partitions for the topic) + * + * @throws Exception + */ + @Test + public void testCreatePartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testCreatePartitionedTopic"; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM); + Thread.sleep(500); + List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + } + + /** + * Test cases for {@link PulsarUtils#getInternalStatsPartitionedTopics(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedStatsInternal</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-internalStats + * method is: GET + * request body: none + * response : json + * + * @throws Exception + */ + @Test + public void testGetInternalStatsPartitionedTopics() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testGetInternalStatsPartitionedTopics"; + + PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM); + Thread.sleep(500); + JsonObject stats = PulsarUtils.getInternalStatsPartitionedTopics(client, pulsarCluster, topicPath); + assertNotNull(stats); + } + + /** + * Test cases for {@link PulsarUtils#getPartitionedTopicMetadata(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedMetadata</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions + * method is: GET + * request body: none + * response : json + * + * @throws Exception + */ + @Test + public void testGetPartitionedTopicMetadata() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testGetPartitionedTopicMetadata"; + PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM); + Thread.sleep(500); + PulsarTopicMetadata metadata = PulsarUtils.getPartitionedTopicMetadata(client, pulsarCluster, + topicPath); + assertNotNull(metadata); + } + + /** + * Test cases for {@link PulsarUtils#deleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic} + * method is: DELETE + * request body: none + * + * @throws Exception + */ + @Test + public void testDeleteNonPartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testDeleteNonPartitionedTopic"; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + + PulsarUtils.deleteNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(!topics.contains(topicInfo)); } + /** + * Test cases for {@link PulsarUtils#forceDeleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic} + * method is: DELETE + * request body: none + * + * @throws Exception + */ + @Test + public void testForceDeleteNonPartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testForceDeleteNonPartitionedTopic"; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + + PulsarUtils.forceDeleteNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(!topics.contains(topicInfo)); + } + + /** + * Test cases for {@link PulsarUtils#deletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions + * method is: DELETE + * request body: none + * + * @throws Exception + */ + @Test + public void testDeletePartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testDeletePartitionedTopic"; + final int numPartitions = 3; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions); + Thread.sleep(500); + List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + + PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(!topics.contains(topicInfo)); + } + + /** + * Test cases for {@link PulsarUtils#forceDeletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions + * method is: DELETE + * request body: none + * + * @throws Exception + */ + @Test + public void testForceDeletePartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testForceDeletePartitionedTopic"; + final int numPartitions = 3; + final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath; + + PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions); + Thread.sleep(500); + List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, + DEFAULT_NAMESPACE); + assertTrue(topics.contains(topicInfo)); + + PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE); + assertTrue(!topics.contains(topicInfo)); + } + + /** + * Test cases for {@link PulsarUtils#lookupTopic(RestTemplate, PulsarClusterInfo, String)}. + * + * @throws Exception + */ + @Test + public void testLookupTopic() throws Exception { + final String topicPath = + DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + "testLookupTopic"; + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + String actual = PulsarUtils.lookupTopic(client, pulsarCluster, topicPath); + assertNotNull(actual); + } + + /** + * Test cases for {@link PulsarUtils#lookupPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}. + * + * @throws Exception + */ + @Test + public void testLookupPartitionedTopic() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testLookupPartitionedTopic"; + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + Map<String, String> actual = PulsarUtils.lookupPartitionedTopic(client, pulsarCluster, topicPath); + assertNotNull(actual); + } + + /** + * Test cases for {@link PulsarUtils#getSubscriptions(RestTemplate, PulsarClusterInfo, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getSubscriptions</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions + * method is: GET + * response body: ["string"] + * + * @throws Exception + */ + @Test + public void testGetSubscriptions() throws Exception { + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testGetSubscriptions"; + PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath); + Thread.sleep(500); + List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath); + assertTrue(actual.size() >= 0); + } + + /** + * Test cases for {@link PulsarUtils#createSubscription(RestTemplate, PulsarClusterInfo, String, String)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createSubscription</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subscriptionName} + * method is: PUT + * response body: json + * + * @throws Exception + */ + @Test + public void testCreateSubscription() throws Exception { + final String subscriptionName = "test_subscription"; + final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testCreateSubscription"; + PulsarUtils.createSubscription(client, pulsarCluster, topicPath, subscriptionName); + Thread.sleep(500); + List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath); + assertTrue(actual.contains(subscriptionName)); + } + + /** + * Test cases for {@link PulsarUtils#examineMessage(RestTemplate, PulsarClusterInfo, String, String, int)}. + * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_examineMessage</a> + * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/examinemessage + * method is: GET + * request parameters: + * - initialPosition: Relative start position to examine message.It can be 'latest' or 'earliest' + * - messagePosition: The position of messages (default 1) + * - authoritative: Whether leader broker redirected this call to this broker. For internal use. + * response body: byteArray + * + * @throws Exception + */ + @Test + public void testExamineMessage() throws Exception { + /* + * Since admin API cannot send messages to the topic, this test case will be simulated using mockito. + */ + final String messageType = "latest"; + final int messagePosition = 1; + String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + + "testtopic-partition-1"; + StringBuilder urlBuilder = new StringBuilder().append(DEFAULT_SERVICE_URL) + .append(PulsarUtils.QUERY_PERSISTENT_PATH).append("/").append(topicPath).append("/examinemessage") + .append("?initialPosition=").append(messageType).append("&messagePosition=").append(messagePosition); + final String expected = "test message!"; + + when(restTemplate.exchange(eq(urlBuilder.toString()), eq(HttpMethod.GET), any(HttpEntity.class), + eq(byte[].class))).thenReturn(byteExchange); + when(byteExchange.getStatusCode()).thenReturn(HttpStatus.OK); + when(byteExchange.getBody()).thenReturn(expected.getBytes(StandardCharsets.UTF_8)); + + ResponseEntity<byte[]> response = PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPath, + messageType, messagePosition); + assertEquals(expected, new String(response.getBody())); + } + + /** + * The case only supports local testing. + * + * @throws Exception + */ + @Ignore + public void localTest() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + PulsarClusterInfo pulsarClusterInfo = new PulsarClusterInfo(); + pulsarClusterInfo.setAdminUrl("http://127.0.0.1:8080"); + String topic = "public/test_pulsar_group/test_pulsar_stream"; + + ResponseEntity<byte[]> pulsarMessage = + PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, + topic, "latest", 1); + List<PulsarMessageInfo> result = PulsarUtils.getMessagesFromHttpResponse(pulsarMessage, topic); + System.out.println(result); + } }