This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3222c561a77 [cleanup] Convert 10 test classes to SharedPulsarBaseTest 
(#25327)
3222c561a77 is described below

commit 3222c561a779bae3eea5a56569cdb4eb70b171a8
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 17:49:23 2026 -0700

    [cleanup] Convert 10 test classes to SharedPulsarBaseTest (#25327)
---
 .../org/apache/pulsar/client/api/BytesKeyTest.java | 23 ++++----------
 .../pulsar/client/api/ConsumerAckListTest.java     | 26 ++++------------
 .../pulsar/client/api/ConsumerCleanupTest.java     | 29 +++++-------------
 .../pulsar/client/api/CustomMessageIdTest.java     | 23 +++-----------
 .../client/api/FailoverSubscriptionTest.java       | 22 +++-----------
 .../api/NonPartitionedTopicExpectedTest.java       | 35 +++++++---------------
 .../client/api/PersistentTopicTerminateTest.java   | 24 +++------------
 .../pulsar/client/api/ProducerCleanupTest.java     | 23 ++------------
 .../pulsar/client/api/ProducerQueueSizeTest.java   | 20 ++-----------
 .../client/api/ReplicateSubscriptionTest.java      | 27 +++--------------
 10 files changed, 52 insertions(+), 200 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
index fdc34add89b..69453f9265e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
@@ -20,38 +20,25 @@ package org.apache.pulsar.client.api;
 
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class BytesKeyTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class BytesKeyTest extends SharedPulsarBaseTest {
 
     private void byteKeysTest(boolean batching) throws Exception {
         Random r = new Random(0);
+        String topic = newTopicName();
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .subscriptionName("my-subscriber-name").subscribe();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
             .enableBatching(batching)
             .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
             .batchingMaxMessages(Integer.MAX_VALUE)
-            .topic("persistent://my-property/my-ns/my-topic1").create();
+            .topic(topic).create();
 
         byte[] byteKey = new byte[1000];
         r.nextBytes(byteKey);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
index ec5540dbbd0..3d085425b13 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
@@ -25,27 +25,13 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class ConsumerAckListTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerAckListTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "ackReceiptEnabled")
     public Object[][] ackReceiptEnabled() {
@@ -61,7 +47,7 @@ public class ConsumerAckListTest extends ProducerConsumerBase 
{
     }
 
     private void ackListMessage(boolean isBatch, boolean isPartitioned, 
boolean ackReceiptEnabled) throws Exception {
-        final String topic = "persistent://my-property/my-ns/batch-ack-" + 
UUID.randomUUID();
+        final String topic = newTopicName();
         final String subName = "testBatchAck-sub" + UUID.randomUUID();
         final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
         if (isPartitioned) {
@@ -113,9 +99,9 @@ public class ConsumerAckListTest extends 
ProducerConsumerBase {
     @Test(timeOut = 30000)
     public void testAckMessageInAnotherTopic() throws Exception {
         final String[] topics = {
-                
"persistent://my-property/my-ns/test-ack-message-in-other-topic1" + 
UUID.randomUUID(),
-                
"persistent://my-property/my-ns/test-ack-message-in-other-topic2" + 
UUID.randomUUID(),
-                
"persistent://my-property/my-ns/test-ack-message-in-other-topic3" + 
UUID.randomUUID()
+                newTopicName(),
+                newTopicName(),
+                newTopicName()
         };
         @Cleanup final Consumer<String> allTopicsConsumer = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topics)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 0cbfde06381..c043182f6b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -19,32 +19,16 @@
 package org.apache.pulsar.client.api;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class ConsumerCleanupTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
-        isTcpLookup = true;
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ConsumerCleanupTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "ackReceiptEnabled")
     public Object[][] ackReceiptEnabled() {
@@ -55,9 +39,12 @@ public class ConsumerCleanupTest extends 
ProducerConsumerBase {
     public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean 
ackReceiptEnabled)
             throws PulsarClientException, InterruptedException {
         @Cleanup
-        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+        PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .statsInterval(1, TimeUnit.SECONDS)
+                .build();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://public/default/" + 
UUID.randomUUID().toString())
+                .topic(newTopicName())
                 .subscriptionName("test")
                 .isAckReceiptEnabled(ackReceiptEnabled)
                 .subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
index d64a2d8b7e4..e8a73aa5a32 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
@@ -25,26 +25,12 @@ import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class CustomMessageIdTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class CustomMessageIdTest extends SharedPulsarBaseTest {
 
     @DataProvider
     public static Object[][] enableBatching() {
@@ -56,7 +42,7 @@ public class CustomMessageIdTest extends ProducerConsumerBase 
{
 
     @Test
     public void testSeek() throws Exception {
-        final var topic = "persistent://my-property/my-ns/test-seek-" + 
System.currentTimeMillis();
+        final var topic = newTopicName();
         @Cleanup final var producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).create();
         final var msgIds = new ArrayList<SimpleMessageIdImpl>();
         for (int i = 0; i < 10; i++) {
@@ -72,8 +58,7 @@ public class CustomMessageIdTest extends ProducerConsumerBase 
{
 
     @Test(dataProvider = "enableBatching")
     public void testAcknowledgment(boolean enableBatching) throws Exception {
-        final var topic = "persistent://my-property/my-ns/test-ack-"
-                + enableBatching + System.currentTimeMillis();
+        final var topic = newTopicName();
         final var producer = pulsarClient.newProducer(Schema.INT32)
                 .topic(topic)
                 .enableBatching(enableBatching)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
index 6263193b0a4..04d6aaf39ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
@@ -23,38 +23,24 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
-public class FailoverSubscriptionTest extends ProducerConsumerBase {
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class FailoverSubscriptionTest extends SharedPulsarBaseTest {
 
     @Test(timeOut = 30_000, invocationCount = 5)
     public void testWaitingCursorsCountAfterSwitchingActiveConsumers() throws 
Exception {
-        final String tp = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        final String tp = newTopicName();
         final String subscription = "s1";
         admin.topics().createNonPartitionedTopic(tp);
         admin.topics().createSubscription(tp, subscription, 
MessageId.earliest);
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopic(tp, false).join().get();
+        PersistentTopic topic = (PersistentTopic) getTopic(tp, 
false).join().get();
         Map<String, ConsumerImpl<byte[]>> consumerMap = new HashMap<>();
         ConsumerImpl<byte[]> firstConsumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(tp)
                 
.subscriptionType(SubscriptionType.Failover).subscriptionName(subscription).subscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
index 7b0edd314d0..928a23b0a4f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
@@ -19,36 +19,22 @@
 package org.apache.pulsar.client.api;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class NonPartitionedTopicExpectedTest extends SharedPulsarBaseTest {
 
     @Test
     public void testWhenNonPartitionedTopicExists() throws Exception {
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topic = newTopicName();
         admin.topics().createNonPartitionedTopic(topic);
         ProducerBuilderImpl<String> producerBuilder =
                 (ProducerBuilderImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topic);
@@ -62,7 +48,7 @@ public class NonPartitionedTopicExpectedTest extends 
ProducerConsumerBase {
 
     @Test
     public void testWhenPartitionedTopicExists() throws Exception {
-        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, 2);
         ProducerBuilderImpl<String> producerBuilder =
                 (ProducerBuilderImpl<String>) 
pulsarClient.newProducer(Schema.STRING).topic(topic);
@@ -89,8 +75,8 @@ public class NonPartitionedTopicExpectedTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "topicTypes")
     public void testWhenTopicNotExists(TopicType topicType) throws Exception {
-        final String namespace = "public/default";
-        final String topic = BrokerTestUtil.newUniqueName("persistent://" + 
namespace + "/tp");
+        final String namespace = getNamespace();
+        final String topic = newTopicName();
         final TopicName topicName = TopicName.get(topic);
         AutoTopicCreationOverride.Builder policyBuilder = 
AutoTopicCreationOverride.builder()
                 .topicType(topicType.toString()).allowAutoTopicCreation(true);
@@ -106,9 +92,10 @@ public class NonPartitionedTopicExpectedTest extends 
ProducerConsumerBase {
         // Verify: create successfully.
         Producer producer = producerBuilder.create();
         // Verify: only create non-partitioned topic.
-        
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                .partitionedTopicExists(topicName));
-        
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
+        
Assert.assertFalse(SharedPulsarCluster.get().getPulsarService().getPulsarResources()
+                
.getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName));
+        
Assert.assertTrue(SharedPulsarCluster.get().getPulsarService().getNamespaceService()
+                .checkNonPartitionedTopicExists(topicName).join());
 
         // cleanup.
         producer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
index 69c012b7622..4cd9371cd9d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
@@ -25,34 +25,18 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
 @Slf4j
-public class PersistentTopicTerminateTest extends ProducerConsumerBase {
-
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class PersistentTopicTerminateTest extends SharedPulsarBaseTest {
 
     @Test
     public void testRecoverAfterTerminate() throws Exception {
-        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String topicName = newTopicName();
         final String subscriptionName = "s1";
         admin.topics().createNonPartitionedTopic(topicName);
         admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
@@ -83,7 +67,7 @@ public class PersistentTopicTerminateTest extends 
ProducerConsumerBase {
         admin.topics().skipAllMessages(topicName, subscriptionName);
         Awaitility.await().untilAsserted(() -> {
             PersistentTopic persistentTopic =
-                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+                    (PersistentTopic) getTopic(topicName, false).join().get();
             ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
             CompletableFuture<Void> trimLedgersFuture = new 
CompletableFuture<>();
             ml.trimConsumedLedgersInBackground(trimLedgersFuture);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index f4daba3bf05..2a7fcb3eb17 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -19,36 +19,19 @@
 package org.apache.pulsar.client.api;
 
 import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
-public class ProducerCleanupTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
-        isTcpLookup = true;
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ProducerCleanupTest extends SharedPulsarBaseTest {
 
     @Test
     public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws 
PulsarClientException, InterruptedException {
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic("persistent://public/default/" + 
UUID.randomUUID().toString())
+                .topic(newTopicName())
                 .sendTimeout(1, TimeUnit.SECONDS)
                 .create();
         producer.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
index 0c470ae5875..2e17913599d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
@@ -22,25 +22,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-public class ProducerQueueSizeTest extends ProducerConsumerBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ProducerQueueSizeTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "matrix")
     public Object[][] matrix() {
@@ -62,7 +48,7 @@ public class ProducerQueueSizeTest extends 
ProducerConsumerBase {
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
-                .serviceUrl(brokerUrl.toString())
+                .serviceUrl(getWebServiceUrl())
                 .memoryLimit(10, SizeUnit.KILO_BYTES)
                 .build();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
index 327081bf1b9..dd468c80d37 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
@@ -26,34 +26,15 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-public class ReplicateSubscriptionTest extends ProducerConsumerBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-    }
+public class ReplicateSubscriptionTest extends SharedPulsarBaseTest {
 
     @DataProvider
     public Object[] replicateSubscriptionState() {
@@ -67,7 +48,7 @@ public class ReplicateSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "replicateSubscriptionState")
     public void testReplicateSubscriptionState(Boolean 
replicateSubscriptionState)
             throws Exception {
-        String topic = "persistent://my-property/my-ns/" + System.nanoTime();
+        String topic = newTopicName();
         String subName = "sub-" + System.nanoTime();
         ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
                 .topic(topic)
@@ -79,7 +60,7 @@ public class ReplicateSubscriptionTest extends 
ProducerConsumerBase {
         
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), 
replicateSubscriptionState);
         @Cleanup
         Consumer<String> ignored = consumerBuilder.subscribe();
-        CompletableFuture<Optional<Topic>> topicIfExists = 
pulsar.getBrokerService().getTopicIfExists(topic);
+        CompletableFuture<Optional<Topic>> topicIfExists = 
getTopicIfExists(topic);
         assertThat(topicIfExists)
                 .succeedsWithin(3, TimeUnit.SECONDS)
                 .matches(optionalTopic -> {

Reply via email to