This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ddc94993fd [cleanup] Convert 15 test classes to SharedPulsarBaseTest 
(#25318)
5ddc94993fd is described below

commit 5ddc94993fd0c437286b95f3e224bf52c20c5c35
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 08:56:34 2026 -0700

    [cleanup] Convert 15 test classes to SharedPulsarBaseTest (#25318)
---
 .../service/BatchMessageBrokerRestartTest.java     | 120 +++++++++
 .../pulsar/broker/service/BatchMessageTest.java    | 279 ++++++---------------
 .../BatchMessageWithBatchIndexLevelTest.java       |  10 +-
 .../BrokerServiceBundlesCacheInvalidationTest.java |  28 +--
 .../broker/service/ConsumedLedgersTrimTest.java    | 110 ++++----
 .../service/CurrentLedgerRolloverIfFullTest.java   |  22 +-
 .../broker/service/ExclusiveProducerTest.java      |  42 ++--
 .../apache/pulsar/broker/service/KeyValueTest.java |  29 +--
 .../pulsar/broker/service/PartitionKeyTest.java    |  24 +-
 .../pulsar/broker/service/ResendRequestTest.java   | 129 ++++------
 .../broker/service/SharedPulsarBaseTest.java       |  33 ++-
 .../pulsar/broker/service/SharedPulsarCluster.java |   8 +-
 .../broker/service/TopicTerminationTest.java       |  42 ++--
 .../broker/service/persistent/ChecksumTest.java    |  24 +-
 .../PartitionKeywordCompatibilityTest.java         |  41 ++-
 .../broker/service/persistent/ShadowTopicTest.java |  61 ++---
 .../pulsar/client/impl/MessageChecksumTest.java    |  79 ++----
 .../impl/UnAcknowledgedMessagesTimeoutTest.java    |  93 +++----
 .../pulsar/client/impl/ZeroQueueSizeTest.java      |  94 +++----
 19 files changed, 513 insertions(+), 755 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
new file mode 100644
index 00000000000..eb032257930
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests batch message behavior across broker restarts.
+ * This test requires stopping and starting the broker, so it cannot use 
SharedPulsarCluster.
+ */
+@Test(groups = "broker")
+public class BatchMessageBrokerRestartTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "containerBuilder")
+    public Object[][] containerBuilderProvider() {
+        return new Object[][] {
+                { BatcherBuilder.DEFAULT },
+                { BatcherBuilder.KEY_BASED }
+        };
+    }
+
+    @Test(dataProvider = "containerBuilder")
+    public void 
testSimpleBatchProducerWithStoppingAndStartingBroker(BatcherBuilder builder) 
throws Exception {
+        // Send enough messages to trigger one batch by size and then have a 
remaining message in the batch container
+        int numMsgs = 3;
+        int numMsgsInBatch = 2;
+        final String topicName = 
"persistent://prop/ns-abc/testBatchBrokerRestart-" + System.nanoTime();
+        final String subscriptionName = "syncsub-1";
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        consumer.close();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+                .batchingMaxMessages(numMsgsInBatch)
+                .enableBatching(true)
+                .batcherBuilder(builder)
+                .create();
+
+        stopBroker();
+
+        List<CompletableFuture<MessageId>> messages = new ArrayList<>();
+        for (int i = 0; i < numMsgs; i++) {
+            byte[] message = ("my-message-" + i).getBytes();
+            messages.add(producer.sendAsync(message));
+        }
+
+        startBroker();
+
+        // Fail if any one message fails to get acknowledged
+        FutureUtil.waitForAll(messages).get(30, TimeUnit.SECONDS);
+
+        Awaitility.await().timeout(30, TimeUnit.SECONDS)
+                .until(() -> 
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+
+        rolloverPerIntervalStats();
+        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        for (int i = 0; i < numMsgs; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            String receivedMessage = new String(msg.getData());
+            String expectedMessage = "my-message-" + i;
+            Assert.assertEquals(receivedMessage, expectedMessage,
+                    "Received message " + receivedMessage + " did not match 
the expected message " + expectedMessage);
+        }
+        consumer.close();
+        producer.close();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 05de288639d..860435fdd9e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -37,9 +37,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
-import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
@@ -53,32 +50,22 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class BatchMessageTest extends BrokerTestBase {
+public class BatchMessageTest extends SharedPulsarBaseTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(BatchMessageTest.class);
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchMessageTest.class);
 
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
+    private long getBacklog(String topicName, String subscriptionName) throws 
Exception {
+        return admin.topics().getStats(topicName)
+                .getSubscriptions().get(subscriptionName).getMsgBacklog();
     }
 
     @DataProvider(name = "codecAndContainerBuilder")
@@ -115,8 +102,7 @@ public class BatchMessageTest extends BrokerTestBase {
             throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -137,13 +123,9 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        // we expect 2 messages in the backlog since we sent 50 messages with 
the batch size set to 25. We have set the
-        // batch time high enough for it to not affect the number of messages 
in the batch
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        // we expect 2 entries in the backlog since we sent 50 messages with 
the batch size set to 25.
+        // We have set the batch time high enough for it to not affect the 
number of messages in the batch
+        assertEquals(getBacklog(topicName, subscriptionName), 2);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         for (int i = 0; i < numMsgs; i++) {
@@ -163,8 +145,7 @@ public class BatchMessageTest extends BrokerTestBase {
             throws Exception {
         int numMsgs = 50;
         int numBytesInBatch = 600;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1" + compressionType.toString();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -188,13 +169,8 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        // we expect 2 messages in the backlog since we sent 50 messages with 
the batch size set to 25. We have set the
-        // batch time high enough for it to not affect the number of messages 
in the batch
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        // we expect 2 entries in the backlog since we sent 50 messages with 
the batch size set to 25.
+        assertEquals(getBacklog(topicName, subscriptionName), 2);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         for (int i = 0; i < numMsgs; i++) {
@@ -213,8 +189,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testSimpleBatchProducerWithFixedBatchTime(CompressionType 
compressionType, BatcherBuilder builder)
             throws Exception {
         int numMsgs = 100;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "time-sub-1" + 
compressionType.toString();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -236,13 +211,9 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
-                
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
-        
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)
 < numMsgs);
+        long backlog = getBacklog(topicName, subscriptionName);
+        LOG.info("Sent {} messages, backlog is {} entries", numMsgs, backlog);
+        assertTrue(backlog < numMsgs);
 
         producer.close();
     }
@@ -251,8 +222,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void 
testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType 
compressionType,
                                                                  
BatcherBuilder builder) throws Exception {
         int numMsgs = 100;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "time-size-sub-1" + 
compressionType.toString();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -274,13 +244,9 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
-                
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
-        
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)
 < numMsgs);
+        long backlog2 = getBacklog(topicName, subscriptionName);
+        LOG.info("Sent {} messages, backlog is {} entries", numMsgs, backlog2);
+        assertTrue(backlog2 < numMsgs);
 
         producer.close();
     }
@@ -290,7 +256,7 @@ public class BatchMessageTest extends BrokerTestBase {
             throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = 
"persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" + 
UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "large-message-sub-1" + 
compressionType.toString();
 
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -322,13 +288,9 @@ public class BatchMessageTest extends BrokerTestBase {
 
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        // we expect 3 messages in the backlog since the large message in the 
middle should
+        // we expect 3 entries in the backlog since the large message in the 
middle should
         // close out the batch and be sent in a batch of its own
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 3);
+        assertEquals(getBacklog(topicName, subscriptionName), 3);
         consumer = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName(subscriptionName)
@@ -341,8 +303,8 @@ public class BatchMessageTest extends BrokerTestBase {
             LOG.info("received msg size: {}", msg.getData().length);
             consumer.acknowledge(msg);
         }
-        Thread.sleep(100);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
         consumer.close();
         producer.close();
     }
@@ -352,7 +314,7 @@ public class BatchMessageTest extends BrokerTestBase {
             throws Exception {
         int numMsgs = 500;
         int numMsgsInBatch = numMsgs / 20;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "pc-sub-1" + 
compressionType.toString();
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -381,12 +343,7 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
-                numMsgs / numMsgsInBatch);
+        assertEquals(getBacklog(topicName, subscriptionName), numMsgs / 
numMsgsInBatch);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         Message<byte[]> lastunackedMsg = null;
@@ -402,8 +359,8 @@ public class BatchMessageTest extends BrokerTestBase {
         if (lastunackedMsg != null) {
             consumer.acknowledgeCumulative(lastunackedMsg);
         }
-        Thread.sleep(100);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
         consumer.close();
         producer.close();
     }
@@ -412,8 +369,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder 
builder) throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs / 2;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "syncsub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -431,67 +387,9 @@ public class BatchMessageTest extends BrokerTestBase {
             producer.send(message);
         }
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        // we would expect 2 messages in the backlog since we sent 10 messages 
with the batch size set to 5.
+        // we would expect 2 entries in the backlog since we sent 10 messages 
with the batch size set to 5.
         // However, we are using synchronous send and so each message will go 
as an individual message
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 10);
-        consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
-
-        for (int i = 0; i < numMsgs; i++) {
-            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
-            assertNotNull(msg);
-            String receivedMessage = new String(msg.getData());
-            String expectedMessage = "my-message-" + i;
-            Assert.assertEquals(receivedMessage, expectedMessage,
-                    "Received message " + receivedMessage + " did not match 
the expected message " + expectedMessage);
-        }
-        consumer.close();
-        producer.close();
-    }
-
-    @Test(dataProvider = "containerBuilder")
-    public void 
testSimpleBatchProducerWithStoppingAndStartingBroker(BatcherBuilder builder) 
throws Exception {
-        // Send enough messages to trigger one batch by size and then have a 
remaining message in the batch container
-        int numMsgs = 3;
-        int numMsgsInBatch = 2;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-"
-                + UUID.randomUUID();
-        final String subscriptionName = "syncsub-1";
-
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                .subscribe();
-        consumer.close();
-
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
-                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
-                .batchingMaxMessages(numMsgsInBatch)
-                .enableBatching(true)
-                .batcherBuilder(builder)
-                .create();
-
-        stopBroker();
-
-        List<CompletableFuture<MessageId>> messages = new ArrayList<>();
-        for (int i = 0; i < numMsgs; i++) {
-            byte[] message = ("my-message-" + i).getBytes();
-            messages.add(producer.sendAsync(message));
-        }
-
-        startBroker();
-
-        // Fail if any one message fails to get acknowledged
-        FutureUtil.waitForAll(messages).get(30, TimeUnit.SECONDS);
-
-        Awaitility.await().timeout(30, TimeUnit.SECONDS)
-                .until(() -> 
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        assertEquals(getBacklog(topicName, subscriptionName), 10);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         for (int i = 0; i < numMsgs; i++) {
@@ -510,8 +408,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder 
builder) throws Exception {
         int numMsgs = 2000;
         int numMsgsInBatch = 4;
-        final String topicName = 
"persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "pc1k-sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -542,13 +439,7 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         LOG.info("[{}] sent {} messages", subscriptionName, numMsgs);
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        // allow stats to be updated..
-        LOG.info("[{}] checking backlog stats..", topic);
-        rolloverPerIntervalStats();
-        assertEquals(topic.getSubscription(subscriptionName)
-                .getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
+        assertEquals(getBacklog(topicName, subscriptionName), numMsgs / 
numMsgsInBatch);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         Message<byte[]> lastunackedMsg = null;
@@ -561,9 +452,10 @@ public class BatchMessageTest extends BrokerTestBase {
             consumer.acknowledgeCumulative(lastunackedMsg);
         }
 
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
         consumer.close();
         producer.close();
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 0);
     }
 
     // test for ack holes
@@ -576,7 +468,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testOutOfOrderAcksForBatchMessage() throws Exception {
         int numMsgs = 40;
         int numMsgsInBatch = numMsgs / 4;
-        final String topicName = 
"persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage";
+        final String topicName = newTopicName();
         final String subscriptionName = "oooack-sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -595,11 +487,7 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        assertEquals(topic.getSubscription(subscriptionName)
-                .getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
+        assertEquals(getBacklog(topicName, subscriptionName), numMsgs / 
numMsgsInBatch);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
         Set<Integer> individualAcks = new HashSet<>();
         for (int i = 15; i < 20; i++) {
@@ -617,24 +505,21 @@ public class BatchMessageTest extends BrokerTestBase {
             } else if (i == 14) {
                 // should ack lid =0 eid = 1 on broker
                 consumer.acknowledgeCumulative(msg);
-                Thread.sleep(1000);
-                rolloverPerIntervalStats();
-                Thread.sleep(1000);
-                
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 3);
+                Awaitility.await().untilAsserted(() ->
+                        assertEquals(getBacklog(topicName, subscriptionName), 
3));
             } else if (individualAcks.contains(i)) {
                 consumer.acknowledge(msg);
             } else {
                 lastunackedMsg = msg;
             }
         }
-        Thread.sleep(1000);
-        rolloverPerIntervalStats();
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 2));
         if (lastunackedMsg != null) {
             consumer.acknowledgeCumulative(lastunackedMsg);
         }
-        Thread.sleep(100);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
         consumer.close();
         producer.close();
     }
@@ -643,8 +528,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder 
builder) throws Exception {
         int numMsgs = 10;
         int numMsgsInBatch = numMsgs;
-        final String topicName = 
"persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "nbcaabp-sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -671,11 +555,7 @@ public class BatchMessageTest extends BrokerTestBase {
         byte[] nobatchmsg = ("nobatch").getBytes();
         noBatchProducer.sendAsync(nobatchmsg).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 2);
+        assertEquals(getBacklog(topicName, subscriptionName), 2);
         consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
 
         Message<byte[]> lastunackedMsg = null;
@@ -685,9 +565,8 @@ public class BatchMessageTest extends BrokerTestBase {
             lastunackedMsg = msg;
         }
         consumer.acknowledgeCumulative(lastunackedMsg);
-        Thread.sleep(100);
-        rolloverPerIntervalStats();
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
         consumer.close();
         producer.close();
         noBatchProducer.close();
@@ -697,7 +576,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) 
throws Exception {
         int numMsgs = 50;
         int numMsgsInBatch = numMsgs / 10;
-        final String topicName = 
"persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" + 
UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "bnb-sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -726,11 +605,7 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
-        rolloverPerIntervalStats();
-        
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn 
> 0.0);
-        
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
+        assertEquals(getBacklog(topicName, subscriptionName),
                 (numMsgs / 2) / numMsgsInBatch + numMsgs / 2);
         consumer = pulsarClient.newConsumer()
                     .topic(topicName)
@@ -753,8 +628,8 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         consumer.acknowledgeCumulative(lastunackedMsg);
 
-        retryStrategically(t -> topic.getSubscription(subscriptionName)
-                .getNumberOfEntriesInBacklog(false) == 0, 100, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(getBacklog(topicName, subscriptionName), 0));
 
         consumer.close();
         producer.close();
@@ -769,7 +644,7 @@ public class BatchMessageTest extends BrokerTestBase {
     @Test(dataProvider = "containerBuilder")
     public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws 
Exception {
         int numMsgs = 10;
-        final String topicName = "persistent://prop/ns-abc/testConcurrentAck-" 
+ UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -789,11 +664,8 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
-
         final Consumer<byte[]> myConsumer = 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
-        // assertEquals(dispatcher.getTotalUnackedMessages(), 1);
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newFixedThreadPool(10);
 
@@ -812,11 +684,12 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         latch.await();
 
-        AbstractPersistentDispatcherMultipleConsumers dispatcher = 
(AbstractPersistentDispatcherMultipleConsumers) topic
-                .getSubscription(subscriptionName).getDispatcher();
         // check strategically to let ack-message receive by broker
-        retryStrategically((test) -> 
dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150);
-        assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
+        Awaitility.await().untilAsserted(() -> {
+            long unacked = 
admin.topics().getStats(topicName).getSubscriptions()
+                    
.get(subscriptionName).getConsumers().get(0).getUnackedMessages();
+            assertEquals(unacked, 0);
+        });
 
         executor.shutdownNow();
         myConsumer.close();
@@ -826,7 +699,7 @@ public class BatchMessageTest extends BrokerTestBase {
     @Test
     public void testOrderingOfKeyBasedBatchMessageContainer()
             throws PulsarClientException, ExecutionException, 
InterruptedException {
-        final String topicName = "persistent://prop/ns-abc/testKeyBased";
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1";
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
@@ -892,7 +765,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
     @Test(dataProvider = "containerBuilder")
     public void testBatchSendOneMessage(BatcherBuilder builder) throws 
Exception {
-        final String topicName = 
"persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -926,7 +799,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws 
Exception {
 
         int numMsgs = 10;
-        final String topicName = 
"persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -959,7 +832,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws 
Exception {
 
         int numMsgs = 10;
-        final String topicName = 
"persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-1";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -992,7 +865,7 @@ public class BatchMessageTest extends BrokerTestBase {
     public void testSendOverSizeMessage(CompressionType compressionType, 
BatcherBuilder builder) throws Exception {
 
         final int numMsgs = 10;
-        final String topicName = 
"persistent://prop/ns-abc/testSendOverSizeMessage-" + UUID.randomUUID();
+        final String topicName = newTopicName();
 
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
@@ -1021,8 +894,7 @@ public class BatchMessageTest extends BrokerTestBase {
 
         int numMsgs = 1000;
         int batchMessages = 10;
-        final String topicName = 
"persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-"
-                + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "bmdap-sub-1";
 
         ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
@@ -1059,7 +931,7 @@ public class BatchMessageTest extends BrokerTestBase {
     private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType 
subType,
                                                              boolean 
enableBatch) throws Exception {
         final int messageCount = 50;
-        final String topicName = 
"persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
+        final String topicName = newTopicName();
         final String subscriptionName = "sub-batch-1";
         @Cleanup
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
@@ -1081,7 +953,7 @@ public class BatchMessageTest extends BrokerTestBase {
         CountDownLatch countDownLatch = new CountDownLatch(messageCount);
         for (int i = 0; i < messageCount; i++) {
             producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> {
-                log.info("Published message with msgId: {}", msgId);
+                LOG.info("Published message with msgId: {}", msgId);
                 countDownLatch.countDown();
             });
             // To generate batch message with different batch size
@@ -1111,26 +983,19 @@ public class BatchMessageTest extends BrokerTestBase {
             }
         }
 
-        String topic = TopicName.get(topicName).toString();
-        PersistentSubscription persistentSubscription =  
(PersistentSubscription) pulsar.getBrokerService()
-                .getTopic(topic, 
false).get().get().getSubscription(subscriptionName);
-
         Awaitility.await().untilAsserted(() -> {
+            long unacked = 
admin.topics().getStats(topicName).getSubscriptions()
+                    
.get(subscriptionName).getConsumers().get(0).getUnackedMessages();
             if (subType == SubscriptionType.Shared) {
                 if (enableBatch) {
-                    if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) {
-                        
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
5 * 1);
-                    } else {
-                        
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
5 * 3);
-                    }
+                    // acknowledgmentAtBatchIndexLevelEnabled defaults to true
+                    assertEquals(unacked, 5 * 1);
                 } else {
-                    
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
messageCount / 2);
+                    assertEquals(unacked, messageCount / 2);
                 }
             } else {
-                
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 
0);
+                assertEquals(unacked, 0);
             }
         });
     }
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(BatchMessageTest.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 417b87acb09..faa49c4c930 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -58,20 +58,28 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker")
-public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
+public class BatchMessageWithBatchIndexLevelTest extends BrokerTestBase {
 
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         super.baseSetup();
     }
 
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
     @Test
     @SneakyThrows
     public void testBatchMessageAck() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
index 1144c989819..1d90a2c49fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
@@ -23,32 +23,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.BundlesData;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class BrokerServiceBundlesCacheInvalidationTest extends BrokerTestBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class BrokerServiceBundlesCacheInvalidationTest extends 
SharedPulsarBaseTest {
 
     @Test
     public void testRecreateNamespace() throws Exception {
-        String namespace = "prop/test-" + System.nanoTime();
-        String topic = namespace + "/my-topic";
-
-        // First create namespace with 20 bundles
-        admin.namespaces().createNamespace(namespace, 20);
+        String namespace = getNamespace();
+        String topic = "persistent://" + namespace + "/my-topic";
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
@@ -59,11 +42,12 @@ public class BrokerServiceBundlesCacheInvalidationTest 
extends BrokerTestBase {
 
         // Delete and recreate with 32 bundles
         admin.topics().delete(topic);
-        deleteNamespaceWithRetry(namespace, false);
+        admin.namespaces().deleteNamespace(namespace, true);
+
         admin.namespaces().createNamespace(namespace, 32);
 
         BundlesData bundlesData = admin.namespaces().getBundles(namespace);
         log.info("BUNDLES: {}", admin.namespaces().getBundles(namespace));
         assertEquals(bundlesData.getNumBundles(), 32);
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 6f47c2c16b9..4548392377f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -36,41 +36,24 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class ConsumedLedgersTrimTest extends BrokerTestBase {
+public class ConsumedLedgersTrimTest extends SharedPulsarBaseTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);
 
-    @Override
-    protected void setup() throws Exception {
-        //No-op
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
-        super.conf.setDefaultRetentionSizeInMB(-1);
-        super.conf.setDefaultRetentionTimeInMinutes(-1);
-    }
-
     @Test
     public void testConsumedLedgersTrim() throws Exception {
-        conf.setRetentionCheckIntervalInSeconds(1);
-        super.baseSetup();
-        final String topicName = 
"persistent://prop/ns-abc/TestConsumedLedgersTrim";
+        // Set infinite retention at namespace level so ledgers are preserved 
until explicitly trimmed
+        admin.namespaces().setRetention(getNamespace(), new 
RetentionPolicies(-1, -1));
+
+        final String topicName = newTopicName();
         final String subscriptionName = "my-subscriber-name";
 
         @Cleanup
@@ -81,9 +64,9 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         @Cleanup
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscribe();
-        Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        Topic topicRef = getTopicReference(topicName).get();
         Assert.assertNotNull(topicRef);
-        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        PersistentTopic persistentTopic = (PersistentTopic) 
getTopic(topicName, true).get().get();
 
         ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionSizeInMB(1L);
@@ -101,7 +84,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
             Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 
1, msgNum / 2);
         });
 
-        //no traffic, unconsumed ledger will be retained
+        // no traffic, unconsumed ledger will be retained
         Thread.sleep(1200);
         Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1, 
msgNum / 2);
 
@@ -111,18 +94,22 @@ public class ConsumedLedgersTrimTest extends 
BrokerTestBase {
             consumer.acknowledge(msg);
         }
 
-        //no traffic, but consumed ledger will be cleaned
+        // Explicitly trigger trim instead of relying on the broker's 
retention check timer
         Thread.sleep(1500);
+        CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+        managedLedger.trimConsumedLedgersInBackground(trimFuture);
+        trimFuture.join();
+
         Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
     }
 
 
     @Test
     public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
-        conf.setRetentionCheckIntervalInSeconds(1);
-        conf.setBrokerDeleteInactiveTopicsEnabled(false);
-        super.baseSetup();
-        final String topicName = 
"persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";
+        // Set infinite retention at namespace level so ledgers are preserved 
until explicitly trimmed
+        admin.namespaces().setRetention(getNamespace(), new 
RetentionPolicies(-1, -1));
+
+        final String topicName = newTopicName();
 
         // write some messages
         @Cleanup
@@ -133,7 +120,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase 
{
 
         // set retention parameters, the ledgers are to be deleted as soon as 
possible
         // but the topic is not to be automatically deleted
-        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        PersistentTopic persistentTopic = (PersistentTopic) 
getTopic(topicName, true).get().get();
         ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionSizeInMB(-1);
         managedLedgerConfig.setRetentionTime(-1, TimeUnit.SECONDS);
@@ -149,20 +136,22 @@ public class ConsumedLedgersTrimTest extends 
BrokerTestBase {
 
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
         Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-        MessageId messageIdBeforeRestart = 
pulsar.getAdminClient().topics().getLastMessageId(topicName);
-        LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
-        assertNotEquals(messageIdBeforeRestart, initialMessageId);
-
-        // restart the broker we have to start a new ledger
-        // the lastMessageId is still on the previous ledger
-        restartBroker();
-        // force load topic
-        pulsar.getAdminClient().topics().getStats(topicName);
-        MessageId messageIdAfterRestart = 
pulsar.getAdminClient().topics().getLastMessageId(topicName);
-        LOG.info("lastmessageid " + messageIdAfterRestart);
-        assertEquals(messageIdAfterRestart, messageIdBeforeRestart);
-
-        persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        MessageId messageIdBeforeUnload = 
admin.topics().getLastMessageId(topicName);
+        LOG.info("messageIdBeforeUnload " + messageIdBeforeUnload);
+        assertNotEquals(messageIdBeforeUnload, initialMessageId);
+
+        // Unload and close the producer to force a new ledger when the topic 
is reloaded.
+        // The lastMessageId should still refer to the previous ledger.
+        producer.close();
+        admin.topics().unload(topicName);
+
+        // Force-load the topic again
+        admin.topics().getStats(topicName);
+        MessageId messageIdAfterUnload = 
admin.topics().getLastMessageId(topicName);
+        LOG.info("lastmessageid " + messageIdAfterUnload);
+        assertEquals(messageIdAfterUnload, messageIdBeforeUnload);
+
+        persistentTopic = (PersistentTopic) getTopic(topicName, 
true).get().get();
         managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
         // now we have two ledgers, the first is expired but is contains the 
lastMessageId
         // the second is empty and should be kept as it is the current tail
@@ -181,7 +170,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase 
{
 
         // lastMessageId should be available even in this case, but is must
         // refer to -1
-        MessageId messageIdAfterTrim = 
pulsar.getAdminClient().topics().getLastMessageId(topicName);
+        MessageId messageIdAfterTrim = 
admin.topics().getLastMessageId(topicName);
         LOG.info("lastmessageid " + messageIdAfterTrim);
         assertEquals(messageIdAfterTrim, MessageId.earliest);
 
@@ -189,31 +178,29 @@ public class ConsumedLedgersTrimTest extends 
BrokerTestBase {
 
     @Test
     public void testAdminTrimLedgers() throws Exception {
-        conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
-        conf.setDefaultNumberOfNamespaceBundles(1);
-        super.baseSetup();
-        final String topicName = 
"persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
         final String subscriptionName = "my-sub";
         final int maxEntriesPerLedger = 2;
         final int partitionedNum = 3;
 
-        admin.topics().createPartitionedTopic(topicName, partitionedNum);
+        String partitionedTopic = "persistent://" + getNamespace()
+                + "/trim-ledgers-" + UUID.randomUUID().toString().substring(0, 
8);
+        admin.topics().createPartitionedTopic(partitionedTopic, 
partitionedNum);
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topicName)
+                .topic(partitionedTopic)
                 .enableBatching(false)
                 .producerName("producer-name")
                 .create();
         @Cleanup
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                .subscribe();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(partitionedTopic)
+                .subscriptionName(subscriptionName).subscribe();
         for (int i = 0; i < partitionedNum; i++) {
-            String topic = TopicName.get(topicName).getPartition(i).toString();
-            Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topic).get();
+            String topic = 
TopicName.get(partitionedTopic).getPartition(i).toString();
+            Topic topicRef = getTopicReference(topic).get();
             Assert.assertNotNull(topicRef);
         }
-        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
-                
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
+        PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(
+                
TopicName.get(partitionedTopic).getPartition(0).toString()).get();
         ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionSizeInMB(-1);
         managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
@@ -231,7 +218,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase 
{
             consumer.acknowledge(msg);
         }
         //consumed ledger should be cleaned
-        admin.topics().trimTopic(topicName);
+        admin.topics().trimTopic(partitionedTopic);
         Awaitility.await().untilAsserted(() ->
                 
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));
 
@@ -239,8 +226,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase 
{
 
     @Test
     public void trimNonPersistentTopic() throws Exception {
-        super.baseSetup();
-        String topicName = 
"non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
+        String topicName = "non-persistent://" + getNamespace() + 
"/trimNonPersistentTopic" + UUID.randomUUID();
         int partitionedNum = 3;
         admin.topics().createPartitionedTopic(topicName, partitionedNum);
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 4f83d25a292..226456df597 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -30,28 +30,14 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
+public class CurrentLedgerRolloverIfFullTest extends SharedPulsarBaseTest {
 
     @Test
     public void testCurrentLedgerRolloverIfFull() throws Exception {
-        final String topicName = 
"persistent://prop/ns-abc/CurrentLedgerRolloverIfFullTest";
+        final String topicName = newTopicName();
 
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
@@ -65,9 +51,9 @@ public class CurrentLedgerRolloverIfFullTest extends 
BrokerTestBase {
                 
.subscriptionName("CurrentLedgerRolloverIfFullTest-subscriber-name")
                 .subscribe();
 
-        Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        Topic topicRef = getTopicReference(topicName).get();
         Assert.assertNotNull(topicRef);
-        PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        PersistentTopic persistentTopic = (PersistentTopic) 
getTopic(topicName, true).get().get();
 
         ManagedLedgerConfig managedLedgerConfig = 
persistentTopic.getManagedLedger().getConfig();
         managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index c108d5b8ca1..1acb7917ac6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.util.HashedWheelTimer;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -39,25 +40,11 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class ExclusiveProducerTest extends BrokerTestBase {
-
-    @BeforeClass
-    protected void setup() throws Exception {
-        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
-        isTcpLookup = true;
-        baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
+public class ExclusiveProducerTest extends SharedPulsarBaseTest {
 
     @DataProvider(name = "topics")
     public static Object[][] topics() {
@@ -150,7 +137,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
 
         @Cleanup
         PulsarClient pulsarClient2 = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .serviceUrl(getBrokerServiceUrl())
                 .operationTimeout(2, TimeUnit.SECONDS)
                 .build();
 
@@ -222,13 +209,21 @@ public class ExclusiveProducerTest extends BrokerTestBase 
{
     @Test(dataProvider = "topics")
     public void testProducerTasksCleanupWhenUsingExclusiveProducers(String 
type, boolean partitioned) throws Exception {
         String topic = newTopic(type, partitioned);
-        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+
+        // Use a dedicated client so we can assert zero pending timeouts 
without interference
+        // from operations on the shared client.
+        @Cleanup
+        PulsarClient dedicatedClient = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .build();
+
+        Producer<String> p1 = dedicatedClient.newProducer(Schema.STRING)
                 .topic(topic)
                 .accessMode(ProducerAccessMode.Exclusive)
                 .create();
 
         try {
-            pulsarClient.newProducer(Schema.STRING)
+            dedicatedClient.newProducer(Schema.STRING)
                     .topic(topic)
                     .accessMode(ProducerAccessMode.Exclusive)
                     .create();
@@ -239,7 +234,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
 
         p1.close();
 
-        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
dedicatedClient).timer();
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(timer.pendingTimeouts(), 0));
     }
 
@@ -322,13 +317,13 @@ public class ExclusiveProducerTest extends BrokerTestBase 
{
 
         // Simulate a producer that takes over and fences p1 through the topic 
epoch
         if (!partitioned) {
-            Topic t = pulsar.getBrokerService().getTopic(topic, 
false).get().get();
+            Topic t = getTopic(topic, false).get().get();
             CompletableFuture<?> f = ((AbstractTopic) 
t).incrementTopicEpoch(Optional.of(0L));
             f.get();
         } else {
             for (int i = 0; i < 3; i++) {
                 String name = TopicName.get(topic).getPartition(i).toString();
-                Topic t = pulsar.getBrokerService().getTopic(name, 
false).get().get();
+                Topic t = getTopic(name, false).get().get();
                 CompletableFuture<?> f = ((AbstractTopic) 
t).incrementTopicEpoch(Optional.of(0L));
                 f.get();
             }
@@ -423,7 +418,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
-                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .serviceUrl(getBrokerServiceUrl())
                 .operationTimeout(1, TimeUnit.SECONDS)
                 .build();
 
@@ -463,7 +458,8 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     }
 
     private String newTopic(String type, boolean isPartitioned) throws 
Exception {
-        String topic = type + "://" + newTopicName();
+        String topicSuffix = UUID.randomUUID().toString().substring(0, 8);
+        String topic = type + "://" + getNamespace() + "/topic-" + topicSuffix;
         if (isPartitioned) {
             admin.topics().createPartitionedTopic(topic, 3);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
index fe18e810a4e..c440bf9d162 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
@@ -34,32 +34,18 @@ import 
org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
- * Null value message produce and consume test.
+ * KeyValue schema produce and consume test.
  */
 @Slf4j
 @Test(groups = "broker")
-public class KeyValueTest extends BrokerTestBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class KeyValueTest extends SharedPulsarBaseTest {
 
     @Test
-    public void keyValueAutoConsumeTest()  throws Exception {
-        String topic = "persistent://prop/ns-abc/kv-record";
+    public void keyValueAutoConsumeTest() throws Exception {
+        String topic = newTopicName();
         admin.topics().createNonPartitionedTopic(topic);
 
         RecordSchemaBuilder builder = SchemaBuilder
@@ -86,11 +72,8 @@ public class KeyValueTest extends BrokerTestBase {
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
 
-
         Message<KeyValue<GenericRecord, GenericRecord>> message = 
consumer.receive();
-        assertEquals(key.getField("test"), 
message.getValue().getKey().getField("test"));
-        assertEquals(value.getField("test"), 
message.getValue().getValue().getField("test"));
-
+        assertEquals(message.getValue().getKey().getField("test"), 
key.getField("test"));
+        assertEquals(message.getValue().getValue().getField("test"), 
value.getField("test"));
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
index 80aa5f198cc..2114aa8a5de 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
@@ -20,39 +20,27 @@ package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class PartitionKeyTest extends BrokerTestBase {
-
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class PartitionKeyTest extends SharedPulsarBaseTest {
 
     @Test(timeOut = 10000)
     public void testPartitionKey() throws Exception {
-        final String topicName = "persistent://prop/ns-abc/testPartitionKey";
+        final String topicName = newTopicName();
 
+        @Cleanup
         org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-subscription").subscribe();
 
         // 1. producer with batch enabled
+        @Cleanup
         Producer<byte[]> producerWithBatches = 
pulsarClient.newProducer().topic(topicName).enableBatching(true)
                 .create();
 
-
         // 2. Producer without batches
         Producer<byte[]> producerWithoutBatches = 
pulsarClient.newProducer().topic(topicName).create();
 
@@ -70,7 +58,5 @@ public class PartitionKeyTest extends BrokerTestBase {
 
             consumer.acknowledge(msg);
         }
-
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 2b6ff1575e0..e4d3863ecdb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -40,33 +41,18 @@ import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class ResendRequestTest extends BrokerTestBase {
+public class ResendRequestTest extends SharedPulsarBaseTest {
     private static final long testTimeout = 60000; // 1 min
     private static final Logger log = 
LoggerFactory.getLogger(ResendRequestTest.class);
 
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @Test(timeOut = testTimeout)
     public void testExclusiveSingleAckedNormalTopic() throws Exception {
-        String key = "testExclusiveSingleAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
 
         HashSet<MessageId> messageIdHashSet = new HashSet<>();
@@ -78,7 +64,7 @@ public class ResendRequestTest extends BrokerTestBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
 
-        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topicRef = (PersistentTopic) 
getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
@@ -156,17 +142,16 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @Test(timeOut = testTimeout)
     public void testSharedSingleAckedNormalTopic() throws Exception {
-        String key = "testSharedSingleAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-shared-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-shared-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
-        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topicRef = (PersistentTopic) 
getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
@@ -175,7 +160,8 @@ public class ResendRequestTest extends BrokerTestBase {
                 
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
 
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
                 .subscriptionType(SubscriptionType.Shared).subscribe();
@@ -235,7 +221,6 @@ public class ResendRequestTest extends BrokerTestBase {
             message2 = consumer2.receive(200, TimeUnit.MILLISECONDS);
         } while (message1 != null || message2 != null);
         log.info("Additional received = " + receivedMessagesAfterRedelivery);
-        newPulsarClient.close();
         assertTrue(receivedMessagesAfterRedelivery > 0);
 
         assertEquals(receivedConsumer1 + receivedConsumer2, totalMessages);
@@ -243,17 +228,16 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @Test(timeOut = testTimeout)
     public void testFailoverSingleAckedNormalTopic() throws Exception {
-        String key = "testFailoverSingleAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-failover-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-failover-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
-        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topicRef = (PersistentTopic) 
getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
@@ -361,10 +345,9 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @Test(timeOut = testTimeout)
     public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
-        String key = "testExclusiveCumulativeAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
 
         // 1. producer connect
@@ -373,7 +356,7 @@ public class ResendRequestTest extends BrokerTestBase {
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
 
-        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topicRef = (PersistentTopic) 
getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
@@ -418,10 +401,9 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @Test(timeOut = testTimeout)
     public void testExclusiveSingleAckedPartitionedTopic() throws Exception {
-        String key = "testExclusiveSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         final int numberOfPartitions = 4;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -471,10 +453,9 @@ public class ResendRequestTest extends BrokerTestBase {
 
     @Test(timeOut = testTimeout)
     public void testSharedSingleAckedPartitionedTopic() throws Exception {
-        String key = "testSharedSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-shared-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-shared-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         final int numberOfPartitions = 3;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -489,7 +470,8 @@ public class ResendRequestTest extends BrokerTestBase {
         Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
 
-        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0); // Creates new client connection
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient();
         Consumer<byte[]> consumer2 = 
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
 
@@ -529,14 +511,14 @@ public class ResendRequestTest extends BrokerTestBase {
             message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
             message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
         } while (message1 != null || message2 != null);
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info("messageCount1 = " + messageCount1);
+        log.info("messageCount2 = " + messageCount2);
+        log.info("ackCount1 = " + ackCount1);
+        log.info("ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2, totalMessages);
 
         // 5. Ask for redeliver
-        log.info(key + ": Sent a Redeliver Message Request");
+        log.info("Sent a Redeliver Message Request");
         consumer1.redeliverUnacknowledgedMessages();
         if ((ackCount1 + ackCount2) == totalMessages) {
             return;
@@ -559,20 +541,18 @@ public class ResendRequestTest extends BrokerTestBase {
             message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
         } while (message1 != null || message2 != null);
 
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
-        newPulsarClient.close();
+        log.info("messageCount1 = " + messageCount1);
+        log.info("messageCount2 = " + messageCount2);
+        log.info("ackCount1 = " + ackCount1);
+        log.info("ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages);
     }
 
     @Test(timeOut = testTimeout)
     public void testFailoverSingleAckedPartitionedTopic() throws Exception {
-        String key = "testFailoverSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-failover-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-failover-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         final int numberOfPartitions = 3;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -625,16 +605,16 @@ public class ResendRequestTest extends BrokerTestBase {
             message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
             message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
         } while (message1 != null || message2 != null);
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info("messageCount1 = " + messageCount1);
+        log.info("messageCount2 = " + messageCount2);
+        log.info("ackCount1 = " + ackCount1);
+        log.info("ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2, totalMessages);
         if ((ackCount1 + ackCount2) == totalMessages) {
             return;
         }
         // 5. Ask for redeliver
-        log.info(key + ": Sent a Redeliver Message Request");
+        log.info("Sent a Redeliver Message Request");
         consumer1.redeliverUnacknowledgedMessages();
         consumer1.close();
 
@@ -648,26 +628,25 @@ public class ResendRequestTest extends BrokerTestBase {
             }
             message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
         } while (message2 != null);
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info("messageCount1 = " + messageCount1);
+        log.info("messageCount2 = " + messageCount2);
+        log.info("ackCount1 = " + ackCount1);
+        log.info("ackCount2 = " + ackCount2);
         assertEquals(messageCount2 + ackCount1, totalMessages);
     }
 
     @Test(timeOut = testTimeout)
     public void testFailoverInactiveConsumer() throws Exception {
-        String key = "testFailoverInactiveConsumer";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-failover-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-failover-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         // 1. producer connect
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
-        PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topicRef = (PersistentTopic) 
getTopicReference(topicName).get();
         assertNotNull(topicRef);
         assertEquals(topicRef.getProducers().size(), 1);
 
@@ -737,7 +716,7 @@ public class ResendRequestTest extends BrokerTestBase {
         Field field = ConsumerBase.class.getDeclaredField("incomingMessages");
         field.setAccessible(true);
         imq = (GrowableArrayBlockingQueue<Message<byte[]>>) field.get(c);
-        log.info("Incoming MEssage Queue: {}", imq.toList());
+        log.info("Incoming Message Queue: {}", imq.toList());
         return imq;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
index dc04e513cca..17d2fdbfaf2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -50,13 +52,13 @@ public abstract class SharedPulsarBaseTest {
     protected PulsarAdmin admin;
     protected PulsarClient pulsarClient;
 
-    private String namespace;
+    private final List<String> namespaces = new ArrayList<>();
 
     /**
      * Returns the unique namespace assigned to the current test method.
      */
     protected String getNamespace() {
-        return namespace;
+        return namespaces.get(0);
     }
 
     /**
@@ -133,21 +135,15 @@ public abstract class SharedPulsarBaseTest {
      */
     @BeforeMethod(alwaysRun = true)
     public void setupSharedTest() throws Exception {
-        String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
-        String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
-        namespace = ns;
-        admin.namespaces().createNamespace(ns, 
Set.of(SharedPulsarCluster.CLUSTER_NAME));
-        log.info("Created test namespace: {}", ns);
+        createNewNamespace();
     }
 
     /**
-     * Force-deletes the namespace created by {@link #setupSharedTest()}, 
including all topics in it.
+     * Force-deletes all namespaces created during the test method.
      */
     @AfterMethod(alwaysRun = true)
     public void cleanupSharedTest() throws Exception {
-        String ns = namespace;
-        if (ns != null) {
-            namespace = null;
+        for (String ns : namespaces) {
             try {
                 admin.namespaces().deleteNamespace(ns, true);
                 log.info("Deleted test namespace: {}", ns);
@@ -155,12 +151,25 @@ public abstract class SharedPulsarBaseTest {
                 log.warn("Failed to delete namespace {}: {}", ns, 
e.getMessage());
             }
         }
+        namespaces.clear();
+    }
+
+    /**
+     * Creates a new namespace under the shared tenant and registers it for 
automatic cleanup.
+     */
+    protected String createNewNamespace() throws Exception {
+        String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+        String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
+        admin.namespaces().createNamespace(ns, 
Set.of(SharedPulsarCluster.CLUSTER_NAME));
+        namespaces.add(ns);
+        log.info("Created test namespace: {}", ns);
+        return ns;
     }
 
     /**
      * Generates a unique persistent topic name within the current test 
namespace.
      */
     protected String newTopicName() {
-        return "persistent://" + namespace + "/topic-" + 
UUID.randomUUID().toString().substring(0, 8);
+        return "persistent://" + getNamespace() + "/topic-" + 
UUID.randomUUID().toString().substring(0, 8);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index f12ba8b98b1..41ff4eb20cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -148,13 +148,13 @@ public class SharedPulsarCluster {
 
         // Reduce thread pool sizes for faster startup (fewer threads to 
create)
         config.setNumIOThreads(2);
-        config.setNumOrderedExecutorThreads(1);
+        config.setNumOrderedExecutorThreads(2);
         config.setNumHttpServerThreads(4);
-        config.setBookkeeperClientNumWorkerThreads(1);
+        config.setBookkeeperClientNumWorkerThreads(2);
         config.setBookkeeperClientNumIoThreads(2);
         config.setNumCacheExecutorThreadPoolSize(1);
-        config.setManagedLedgerNumSchedulerThreads(1);
-        config.setTopicOrderedExecutorThreadNum(2);
+        config.setManagedLedgerNumSchedulerThreads(2);
+        config.setTopicOrderedExecutorThreadNum(4);
 
         // Disable the load balancer — single-broker cluster doesn't need it
         config.setLoadBalancerEnabled(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 40379420637..a751f1ad743 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -46,31 +47,14 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class TopicTerminationTest extends BrokerTestBase {
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        // use Pulsar binary lookup since the HTTP client shares the Pulsar 
client timer
-        isTcpLookup = true;
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    private final String topicName = "persistent://prop/ns-abc/topic0";
+public class TopicTerminationTest extends SharedPulsarBaseTest {
 
     @Test
     public void testSimpleTermination() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -93,6 +77,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(groups = "broker")
     public void testCreateProducerOnTerminatedTopic() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -114,6 +99,7 @@ public class TopicTerminationTest extends BrokerTestBase {
     }
 
     public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws 
Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -139,6 +125,8 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
+        String topicName = newTopicName();
+        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -188,6 +176,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(groups = "broker")
     public void testDoubleTerminate() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -207,6 +196,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(groups = "broker")
     public void testTerminatePartitionedTopic() throws Exception {
+        String topicName = newTopicName();
         admin.topics().createPartitionedTopic(topicName, 4);
 
         try {
@@ -219,11 +209,12 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationConsumer() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
-        org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
         MessageId msgId1 = producer.send("test-msg-1".getBytes());
@@ -256,6 +247,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationMessageListener() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -263,7 +255,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
         CountDownLatch latch = new CountDownLatch(1);
 
-        org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").messageListener(new 
MessageListener<byte[]>() {
 
                     @Override
@@ -295,6 +287,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReader() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -326,6 +319,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSimpleTerminationReaderListener() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -362,6 +356,7 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopic() throws Exception {
+        String topicName = newTopicName();
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -372,7 +367,7 @@ public class TopicTerminationTest extends BrokerTestBase {
         MessageId lastMessageId = 
admin.topics().terminateTopicAsync(topicName).get();
         assertEquals(lastMessageId, msgId2);
 
-        org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
         Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasReachedEndOfTopic()));
@@ -380,13 +375,14 @@ public class TopicTerminationTest extends BrokerTestBase {
 
     @Test(timeOut = 20000)
     public void testSubscribeOnTerminatedTopicWithNoMessages() throws 
Exception {
+        String topicName = newTopicName();
         pulsarClient.newProducer().topic(topicName)
             .enableBatching(false)
             .messageRoutingMode(MessageRoutingMode.SinglePartition)
             .create();
         admin.topics().terminateTopicAsync(topicName).get();
 
-        org.apache.pulsar.client.api.Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName)
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
 
         Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasReachedEndOfTopic()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
index c110512b854..3a313f7b2a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
@@ -26,37 +26,23 @@ import java.util.List;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.common.protocol.Commands;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
-public class ChecksumTest extends BrokerTestBase {
-
-    @BeforeClass
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
+public class ChecksumTest extends SharedPulsarBaseTest {
 
     @Test
     public void verifyChecksumStoredInManagedLedger() throws Exception {
-        final String topicName = "persistent://prop/ns-abc/topic0";
+        final String topicName = newTopicName();
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
 
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName).get();
+        PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName).get();
 
         ManagedLedger ledger = topic.getManagedLedger();
         ManagedCursor cursor = ledger.openCursor("test");
@@ -79,7 +65,7 @@ public class ChecksumTest extends BrokerTestBase {
 
     @Test
     public void verifyChecksumSentToConsumer() throws Exception {
-        final String topicName = "persistent://prop/ns-abc/topic-1";
+        final String topicName = newTopicName();
 
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
         RawReader reader = RawReader.create(pulsarClient, topicName, 
"sub").get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
index deb64287406..9642d47585f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static org.testng.Assert.fail;
 import java.util.List;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,60 +30,49 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test
-public class PartitionKeywordCompatibilityTest extends BrokerTestBase {
-
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        baseSetup();
-        setupDefaultTenantAndNamespace();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
+public class PartitionKeywordCompatibilityTest extends SharedPulsarBaseTest {
 
     public void testAutoCreatePartitionTopicWithKeywordAndDeleteIt()
             throws PulsarAdminException, PulsarClientException {
+        String namespace = getNamespace();
         AutoTopicCreationOverride override = 
AutoTopicCreationOverride.builder()
                 .allowAutoTopicCreation(true)
                 .topicType("partitioned")
                 .defaultNumPartitions(1)
                 .build();
-        admin.namespaces().setAutoTopicCreation("public/default", override);
-        String topicName = "persistent://public/default/XXX-partition-0-dd";
+        admin.namespaces().setAutoTopicCreation(namespace, override);
+        String topicName = "persistent://" + namespace + "/XXX-partition-0-dd";
         @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName("sub-1")
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscribe();
-        List<String> topics = admin.topics().getList("public/default");
-        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList("public/default");
+        List<String> topics = admin.topics().getList(namespace);
+        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespace);
         
Assert.assertTrue(topics.contains(TopicName.get(topicName).getPartition(0).toString()));
         Assert.assertTrue(partitionedTopicList.contains(topicName));
         consumer.close();
         PartitionedTopicStats stats = 
admin.topics().getPartitionedStats(topicName, false);
         Assert.assertEquals(stats.getSubscriptions().size(), 1);
         admin.topics().deletePartitionedTopic(topicName);
-        topics = admin.topics().getList("public/default");
-        partitionedTopicList = 
admin.topics().getPartitionedTopicList("public/default");
+        topics = admin.topics().getList(namespace);
+        partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespace);
         Assert.assertFalse(topics.contains(topicName));
         Assert.assertFalse(partitionedTopicList.contains(topicName));
     }
 
     @Test
     public void testDeletePartitionedTopicValidation() throws 
PulsarAdminException {
-        final String topicName = 
"persistent://public/default/testDeletePartitionedTopicValidation";
-        final String partitionKeywordTopic = 
"persistent://public/default/testDelete-partition-edTopicValidation";
-        final String partitionedTopic = 
"persistent://public/default/testDeletePartitionedTopicValidation-partition-0";
+        String namespace = getNamespace();
+        final String topicName = "persistent://" + namespace + 
"/testDeletePartitionedTopicValidation";
+        final String partitionKeywordTopic = "persistent://" + namespace
+                + "/testDelete-partition-edTopicValidation";
+        final String partitionedTopic = "persistent://" + namespace
+                + "/testDeletePartitionedTopicValidation-partition-0";
         try {
             admin.topics().deletePartitionedTopic(topicName);
             fail("expect not found!");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
index 840454b4e21..0ef638b4b21 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -30,7 +30,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -41,54 +41,36 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class ShadowTopicTest extends BrokerTestBase {
-
-    @BeforeClass(alwaysRun = true)
-    @Override
-    protected void setup() throws Exception {
-        baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
-
-    private String newShadowSourceTopicName() {
-        return "persistent://" + newTopicName();
-    }
+public class ShadowTopicTest extends SharedPulsarBaseTest {
 
     @Test
     public void testNonPartitionedShadowTopicSetup() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         //1. test shadow topic setting in topic creation.
         admin.topics().createNonPartitionedTopic(sourceTopic);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
         PersistentTopic brokerShadowTopic =
-                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+                (PersistentTopic) getTopicIfExists(shadowTopic).get().get();
         Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof 
ShadowManagedLedgerImpl);
         
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), 
sourceTopic);
         Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), 
sourceTopic);
 
         //2. test shadow topic could be properly loaded after unload.
-        admin.namespaces().unload("prop/ns-abc");
-        
Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+        admin.namespaces().unload(getNamespace());
+        Assert.assertTrue(getTopicReference(shadowTopic).isEmpty());
         Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), 
sourceTopic);
-        brokerShadowTopic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+        brokerShadowTopic = (PersistentTopic) 
getTopicIfExists(shadowTopic).get().get();
         Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof 
ShadowManagedLedgerImpl);
         
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), 
sourceTopic);
     }
 
     @Test
     public void testPartitionedShadowTopicSetup() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         String sourceTopicPartition = 
TopicName.get(sourceTopic).getPartition(0).toString();
         String shadowTopicPartition = 
TopicName.get(shadowTopic).getPartition(0).toString();
@@ -98,26 +80,25 @@ public class ShadowTopicTest extends BrokerTestBase {
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
         pulsarClient.newProducer().topic(shadowTopic).create().close(); 
//trigger loading partitions.
 
-        PersistentTopic brokerShadowTopic = (PersistentTopic) 
pulsar.getBrokerService()
-                .getTopicIfExists(shadowTopicPartition).get().get();
+        PersistentTopic brokerShadowTopic = (PersistentTopic) 
getTopicIfExists(shadowTopicPartition).get().get();
         Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof 
ShadowManagedLedgerImpl);
         
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), 
sourceTopicPartition);
         Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), 
sourceTopic);
 
         //2. test shadow topic could be properly loaded after unload.
-        admin.namespaces().unload("prop/ns-abc");
-        
Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+        admin.namespaces().unload(getNamespace());
+        Assert.assertTrue(getTopicReference(shadowTopic).isEmpty());
 
         Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), 
sourceTopic);
         brokerShadowTopic =
-                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get();
+                (PersistentTopic) 
getTopicIfExists(shadowTopicPartition).get().get();
         Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof 
ShadowManagedLedgerImpl);
         
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), 
sourceTopicPartition);
     }
 
     @Test
     public void testPartitionedShadowTopicProduceAndConsume() throws Exception 
{
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createPartitionedTopic(sourceTopic, 3);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -146,7 +127,7 @@ public class ShadowTopicTest extends BrokerTestBase {
 
     @Test
     public void testShadowTopicNotWritable() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createNonPartitionedTopic(sourceTopic);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -156,18 +137,19 @@ public class ShadowTopicTest extends BrokerTestBase {
     }
 
     private void awaitUntilShadowReplicatorReady(String sourceTopic, String 
shadowTopic) {
-        Awaitility.await().untilAsserted(()->{
+        Awaitility.await().untilAsserted(() -> {
             PersistentTopic sourcePersistentTopic =
-                    (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(sourceTopic).get().get();
+                    (PersistentTopic) 
getTopicIfExists(sourceTopic).get().get();
             ShadowReplicator
                     replicator = (ShadowReplicator) 
sourcePersistentTopic.getShadowReplicators().get(shadowTopic);
             Assert.assertNotNull(replicator);
             Assert.assertEquals(String.valueOf(replicator.getState()), 
"Started");
         });
     }
+
     @Test
     public void testShadowTopicConsuming() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createNonPartitionedTopic(sourceTopic);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -188,7 +170,7 @@ public class ShadowTopicTest extends BrokerTestBase {
 
     @Test
     public void testShadowTopicConsumingWithStringSchema() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createNonPartitionedTopic(sourceTopic);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -220,9 +202,10 @@ public class ShadowTopicTest extends BrokerTestBase {
         int x;
         int y;
     }
+
     @Test
     public void testShadowTopicConsumingWithJsonSchema() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createNonPartitionedTopic(sourceTopic);
         admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -244,7 +227,7 @@ public class ShadowTopicTest extends BrokerTestBase {
 
     @Test
     public void testConsumeShadowMessageWithoutCache() throws Exception {
-        String sourceTopic = newShadowSourceTopicName();
+        String sourceTopic = newTopicName();
         String shadowTopic = sourceTopic + "-shadow";
         admin.topics().createNonPartitionedTopic(sourceTopic);
         @Cleanup Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
index c475e8aa61c..61e09664ea1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -26,16 +26,14 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Method;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.client.api.ClientBuilder;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -45,40 +43,14 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.tests.EnumValuesDataProvider;
-import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class MessageChecksumTest extends BrokerTestBase {
+public class MessageChecksumTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(MessageChecksumTest.class);
 
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception {
-        baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
-
-    @Override
-    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
-        // disable connection pooling
-        clientBuilder.connectionsPerBroker(0);
-    }
-
-    @Override
-    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
-        return PulsarTestClient.create(clientBuilder);
-    }
-
     // Enum parameter used to describe the 2 different scenarios in the
     // testChecksumCompatibilityInMixedVersionBrokerCluster test case
     enum MixedVersionScenario {
@@ -114,31 +86,31 @@ public class MessageChecksumTest extends BrokerTestBase {
     @Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = 
"values")
     public void 
testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario 
mixedVersionScenario)
             throws Exception {
-        // GIVEN
-        final String topicName =
-                
"persistent://prop/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
+        final String topicName = newTopicName();
+
+        // Create a PulsarTestClient with connection pooling disabled
+        @Cleanup
+        PulsarTestClient pulsarTestClient = (PulsarTestClient) 
PulsarTestClient.create(
+                PulsarClient.builder()
+                        .serviceUrl(getBrokerServiceUrl())
+                        .connectionsPerBroker(0));
 
         if (mixedVersionScenario == 
MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
             // Given, the client thinks it's connected to a broker that 
doesn't support message checksums
-            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+            
pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
         }
 
-        PulsarTestClient pulsarTestClient = (PulsarTestClient) pulsarClient;
-
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarTestClient.newProducer()
                 .topic(topicName)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        Consumer<byte[]> consumer = pulsarTestClient.newConsumer()
                 .topic(topicName)
                 .subscriptionName("my-sub")
                 .subscribe();
 
-        // inject a CountDownLatch to the pending message callback of the 
PulsarTestClient
-        CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
-
         // WHEN
         // a message is sent, it should succeed
         producer.send("message-1".getBytes());
@@ -169,10 +141,10 @@ public class MessageChecksumTest extends BrokerTestBase {
 
         if (mixedVersionScenario == 
MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
             // Given, the client thinks it's connected to a broker that 
doesn't support message checksums
-            makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+            
pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
         } else {
             // Reset the overriding set in the beginning
-            resetOverridingConnectedBrokerVersion();
+            pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(0);
         }
 
         // And
@@ -199,28 +171,11 @@ public class MessageChecksumTest extends BrokerTestBase {
         assertEquals(new String(msg.getData()), "message-3");
     }
 
-    private void 
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
-        // make the client think that the connected broker is of version which 
doesn't support checksum validation
-        ((PulsarTestClient) 
pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
-    }
-
-    private void resetOverridingConnectedBrokerVersion() {
-        // reset the override and use the actual protocol version
-        ((PulsarTestClient) 
pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
-    }
-
-    private void 
waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
-        // wait until the message is in the pending queue
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals(producer.getPendingQueueSize(), 1);
-        });
-    }
-
     @Test
     public void testTamperingMessageIsDetected() throws Exception {
         // GIVEN
         ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
-                
.topic("persistent://prop/ns-abc/testTamperingMessageIsDetected")
+                .topic(newTopicName())
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0883c329b21..4393e2f5ec2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -23,9 +23,8 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.util.HashSet;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -37,29 +36,15 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
 @Test(groups = "broker-impl")
-public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase {
+public class UnAcknowledgedMessagesTimeoutTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(UnAcknowledgedMessagesTimeoutTest.class);
     private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
 
-    @Override
-    @BeforeMethod
-    public void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @Override
-    @AfterMethod(alwaysRun = true)
-    public void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
     @DataProvider(name = "variationsRedeliveryTracker")
     public static Object[][] variationsRedeliveryTracker() {
         return new Object[][]{
@@ -71,10 +56,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     @Test(dataProvider = "variationsRedeliveryTracker")
     public void testExclusiveSingleAckedNormalTopic(boolean 
isRedeliveryTracker) throws Exception {
-        String key = "testExclusiveSingleAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
 
         // 1. producer connect
@@ -110,7 +94,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
             message = consumer.receive(500, TimeUnit.MILLISECONDS);
         }
         long size = ((ConsumerImpl<?>) 
consumer).getUnAckedMessageTracker().size();
-        log.info(key + " Unacked Message Tracker size is " + size);
+        log.info(" Unacked Message Tracker size is " + size);
         assertEquals(size, totalMessages / 2);
 
         // Blocking call, redeliver should kick in
@@ -131,17 +115,16 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         } while (message != null);
 
         size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size();
-        log.info(key + " Unacked Message Tracker size is " + size);
+        log.info(" Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
         assertEquals(hSet.size(), totalMessages);
     }
 
     @Test(dataProvider = "variationsRedeliveryTracker")
     public void testExclusiveCumulativeAckedNormalTopic(boolean 
isRedeliveryTracker) throws Exception {
-        String key = "testExclusiveCumulativeAckedNormalTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
 
         // 1. producer connect
@@ -194,10 +177,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     @Test(dataProvider = "variationsRedeliveryTracker")
     public void testSharedSingleAckedPartitionedTopic(boolean 
isRedeliveryTracker) throws Exception {
-        String key = "testSharedSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-shared-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-shared-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 20;
         final int numberOfPartitions = 3;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -248,36 +230,36 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         int ackCount1 = 0;
         int ackCount2 = messageCount2;
 
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info(" messageCount1 = " + messageCount1);
+        log.info(" messageCount2 = " + messageCount2);
+        log.info(" ackCount1 = " + ackCount1);
+        log.info(" ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2, totalMessages);
 
         // 5. Check if Messages redelivered again
         // Since receive is a blocking call hoping that timeout will kick in
         Thread.sleep((int) (ackTimeOutMillis * 1.1));
-        log.info(key + " Timeout should be triggered now");
+        log.info(" Timeout should be triggered now");
         messageCount1 = receiveAllMessage(consumer1, true);
         messageCount2 += receiveAllMessage(consumer2, false);
 
         ackCount1 = messageCount1;
 
-        log.info(key + " messageCount1 = " + messageCount1);
-        log.info(key + " messageCount2 = " + messageCount2);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info(" messageCount1 = " + messageCount1);
+        log.info(" messageCount2 = " + messageCount2);
+        log.info(" ackCount1 = " + ackCount1);
+        log.info(" ackCount2 = " + ackCount2);
         assertEquals(messageCount1 + messageCount2, totalMessages);
         assertEquals(ackCount1 + messageCount2, totalMessages);
 
         Thread.sleep((int) (ackTimeOutMillis * 2));
 
         // Since receive is a blocking call hoping that timeout will kick in
-        log.info(key + " Timeout should be triggered again");
+        log.info(" Timeout should be triggered again");
         ackCount1 += receiveAllMessage(consumer1, true);
         ackCount2 += receiveAllMessage(consumer2, true);
-        log.info(key + " ackCount1 = " + ackCount1);
-        log.info(key + " ackCount2 = " + ackCount2);
+        log.info(" ackCount1 = " + ackCount1);
+        log.info(" ackCount2 = " + ackCount2);
         assertEquals(ackCount1 + ackCount2, totalMessages);
     }
 
@@ -300,10 +282,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     @Test(dataProvider = "variationsRedeliveryTracker")
     public void testFailoverSingleAckedPartitionedTopic(boolean 
isRedeliveryTracker) throws Exception {
-        String key = "testFailoverSingleAckedPartitionedTopic";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key + 
UUID.randomUUID().toString();
-        final String subscriptionName = "my-failover-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-failover-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 10;
         final int numberOfPartitions = 3;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -397,7 +378,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
         // 5. Check if Messages redelivered again
         Thread.sleep(ackTimeOutMillis * 2);
-        log.info(key + " Timeout should be triggered now");
+        log.info(" Timeout should be triggered now");
         messagesReceived = 0;
         while (true) {
             Message<byte[]> message1 = consumer1.receive(500, 
TimeUnit.MILLISECONDS);
@@ -439,10 +420,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     @Test
     public void testCheckUnAcknowledgedMessageTimer() throws 
PulsarClientException, InterruptedException {
-        String key = "testCheckUnAcknowledgedMessageTimer";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 3;
 
         // 1. producer connect
@@ -494,10 +474,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
     public void testCheckUnAcknowledgedMessageRedeliveryTimer(long 
ackTimeOutMillis, long minDelayMs,
                                                               long maxDelayMs, 
int multiplier)
             throws PulsarClientException, InterruptedException {
-        String key = "testCheckUnAcknowledgedMessageRedeliveryTimer";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
         final int totalMessages = 3;
 
         // 1. producer connect
@@ -544,7 +523,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
 
     @Test
     public void testSingleMessageBatch() throws Exception {
-        String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+        String topicName = newTopicName();
 
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 8f10281bcd0..68a49427bb8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -34,8 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -53,27 +52,13 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
-public class ZeroQueueSizeTest extends BrokerTestBase {
+public class ZeroQueueSizeTest extends SharedPulsarBaseTest {
     private static final Logger log = 
LoggerFactory.getLogger(ZeroQueueSizeTest.class);
     private final int totalMessages = 10;
 
-    @BeforeClass
-    @Override
-    public void setup() throws Exception {
-        baseSetup();
-    }
-
-    @AfterClass(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        internalCleanup();
-    }
-
     @Test
     public void validQueueSizeConfig() {
         pulsarClient.newConsumer().receiverQueueSize(0);
@@ -86,9 +71,8 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(expectedExceptions = 
PulsarClientException.InvalidConfigurationException.class)
     public void zeroQueueSizeReceiveAsyncInCompatibility() throws 
PulsarClientException {
-        String key = "zeroQueueSizeReceiveAsyncInCompatibility";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .receiverQueueSize(0).subscribe();
@@ -97,9 +81,8 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(expectedExceptions = PulsarClientException.class)
     public void zeroQueueSizePartitionedTopicInCompatibility() throws 
PulsarClientException, PulsarAdminException {
-        String key = "zeroQueueSizePartitionedTopicInCompatibility";
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
         int numberOfPartitions = 3;
         admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
         
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).receiverQueueSize(0).subscribe();
@@ -107,12 +90,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void zeroQueueSizeNormalConsumer() throws PulsarClientException {
-        String key = "nonZeroQueueSizeNormalConsumer";
-
-        // 1. Config
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
 
         // 2. Create Producer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -144,12 +124,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void zeroQueueSizeConsumerListener() throws Exception {
-        String key = "zeroQueueSizeConsumerListener";
-
-        // 1. Config
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
 
         // 2. Create Producer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -188,12 +165,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void zeroQueueSizeSharedSubscription() throws PulsarClientException 
{
-        String key = "zeroQueueSizeSharedSubscription";
-
-        // 1. Config
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
 
         // 2. Create Producer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -229,12 +203,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void zeroQueueSizeFailoverSubscription() throws 
PulsarClientException {
-        String key = "zeroQueueSizeFailoverSubscription";
-
-        // 1. Config
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
 
         // 2. Create Producer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -288,12 +259,13 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
     public void testFailedZeroQueueSizeBatchMessage() throws 
PulsarClientException {
 
         int batchMessageDelayMs = 100;
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic1")
+        String topicName = newTopicName();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
                 .subscribe();
 
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
-            .topic("persistent://prop/ns-abc/topic1")
+            .topic(topicName)
             .messageRoutingMode(MessageRoutingMode.SinglePartition);
 
         if (batchMessageDelayMs != 0) {
@@ -324,7 +296,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void testZeroQueueSizeMessageRedelivery() throws 
PulsarClientException {
-        final String topic = 
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery";
+        final String topic = newTopicName();
         Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
             .topic(topic)
             .receiverQueueSize(0)
@@ -356,7 +328,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test
     public void testZeroQueueSizeMessageRedeliveryForListener() throws 
Exception {
-        final String topic = 
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener";
+        final String topic = newTopicName();
         final int messages = 10;
         final CountDownLatch latch = new CountDownLatch(messages * 2);
         Set<Integer> receivedMessages = new HashSet<>();
@@ -394,7 +366,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
     @Test
     public void testZeroQueueSizeMessageRedeliveryForAsyncReceive()
             throws PulsarClientException, ExecutionException, 
InterruptedException {
-        final String topic = 
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive";
+        final String topic = newTopicName();
         Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
             .topic(topic)
             .receiverQueueSize(0)
@@ -431,8 +403,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
     @Test(timeOut = 30000)
     public void testZeroQueueGetExceptionWhenReceiveBatchMessage() throws 
PulsarClientException {
 
-        final String topic = BrokerTestUtil.newUniqueName(
-                
"persistent://prop/ns-abc/testZeroQueueGetExceptionWhenReceiveBatchMessage-");
+        final String topic = newTopicName();
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
                 
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
                 .subscribe();
@@ -465,7 +436,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(timeOut = 30000)
     public void testPauseAndResume() throws Exception {
-        final String topicName = 
"persistent://prop/ns-abc/zero-queue-pause-and-resume";
+        final String topicName = newTopicName();
         final String subName = "sub";
 
         AtomicReference<CountDownLatch> latch = new AtomicReference<>(new 
CountDownLatch(1));
@@ -501,7 +472,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(timeOut = 30000)
     public void testPauseAndResumeWithUnloading() throws Exception {
-        final String topicName = 
"persistent://prop/ns-abc/zero-queue-pause-and-resume-with-unloading";
+        final String topicName = newTopicName();
         final String subName = "sub";
 
         AtomicReference<CountDownLatch> latch = new AtomicReference<>(new 
CountDownLatch(1));
@@ -540,7 +511,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(timeOut = 30000)
     public void testPauseAndResumeNoReconnection() throws Exception {
-        final String topicName = 
"persistent://prop/ns-abc/zero-queue-pause-and-resume-no-reconnection";
+        final String topicName = newTopicName();
         final String subName = "sub";
 
         final Object object = new Object();
@@ -589,12 +560,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
 
     @Test(timeOut = 30000)
     public void 
testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage() throws 
Exception {
-        String key = "payloadProcessorReceiveBatchMessage";
-
-        // 1. Config
-        final String topicName = "persistent://prop/ns-abc/topic-" + key;
-        final String subscriptionName = "my-ex-subscription-" + key;
-        final String messagePredicate = "my-message-" + key + "-";
+        final String topicName = newTopicName();
+        final String subscriptionName = "my-ex-subscription";
+        final String messagePredicate = "my-message-";
 
         // 2. Create Producer
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)


Reply via email to