This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3222c561a77 [cleanup] Convert 10 test classes to SharedPulsarBaseTest
(#25327)
3222c561a77 is described below
commit 3222c561a779bae3eea5a56569cdb4eb70b171a8
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 17:49:23 2026 -0700
[cleanup] Convert 10 test classes to SharedPulsarBaseTest (#25327)
---
.../org/apache/pulsar/client/api/BytesKeyTest.java | 23 ++++----------
.../pulsar/client/api/ConsumerAckListTest.java | 26 ++++------------
.../pulsar/client/api/ConsumerCleanupTest.java | 29 +++++-------------
.../pulsar/client/api/CustomMessageIdTest.java | 23 +++-----------
.../client/api/FailoverSubscriptionTest.java | 22 +++-----------
.../api/NonPartitionedTopicExpectedTest.java | 35 +++++++---------------
.../client/api/PersistentTopicTerminateTest.java | 24 +++------------
.../pulsar/client/api/ProducerCleanupTest.java | 23 ++------------
.../pulsar/client/api/ProducerQueueSizeTest.java | 20 ++-----------
.../client/api/ReplicateSubscriptionTest.java | 27 +++--------------
10 files changed, 52 insertions(+), 200 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
index fdc34add89b..69453f9265e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BytesKeyTest.java
@@ -20,38 +20,25 @@ package org.apache.pulsar.client.api;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class BytesKeyTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class BytesKeyTest extends SharedPulsarBaseTest {
private void byteKeysTest(boolean batching) throws Exception {
Random r = new Random(0);
+ String topic = newTopicName();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name").subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(batching)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.SECONDS)
.batchingMaxMessages(Integer.MAX_VALUE)
- .topic("persistent://my-property/my-ns/my-topic1").create();
+ .topic(topic).create();
byte[] byteKey = new byte[1000];
r.nextBytes(byteKey);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
index ec5540dbbd0..3d085425b13 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerAckListTest.java
@@ -25,27 +25,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class ConsumerAckListTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerAckListTest extends SharedPulsarBaseTest {
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
@@ -61,7 +47,7 @@ public class ConsumerAckListTest extends ProducerConsumerBase
{
}
private void ackListMessage(boolean isBatch, boolean isPartitioned,
boolean ackReceiptEnabled) throws Exception {
- final String topic = "persistent://my-property/my-ns/batch-ack-" +
UUID.randomUUID();
+ final String topic = newTopicName();
final String subName = "testBatchAck-sub" + UUID.randomUUID();
final int messageNum = ThreadLocalRandom.current().nextInt(50, 100);
if (isPartitioned) {
@@ -113,9 +99,9 @@ public class ConsumerAckListTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testAckMessageInAnotherTopic() throws Exception {
final String[] topics = {
-
"persistent://my-property/my-ns/test-ack-message-in-other-topic1" +
UUID.randomUUID(),
-
"persistent://my-property/my-ns/test-ack-message-in-other-topic2" +
UUID.randomUUID(),
-
"persistent://my-property/my-ns/test-ack-message-in-other-topic3" +
UUID.randomUUID()
+ newTopicName(),
+ newTopicName(),
+ newTopicName()
};
@Cleanup final Consumer<String> allTopicsConsumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topics)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index 0cbfde06381..c043182f6b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -19,32 +19,16 @@
package org.apache.pulsar.client.api;
import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class ConsumerCleanupTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
- isTcpLookup = true;
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerCleanupTest extends SharedPulsarBaseTest {
@DataProvider(name = "ackReceiptEnabled")
public Object[][] ackReceiptEnabled() {
@@ -55,9 +39,12 @@ public class ConsumerCleanupTest extends
ProducerConsumerBase {
public void testAllTimerTaskShouldCanceledAfterConsumerClosed(boolean
ackReceiptEnabled)
throws PulsarClientException, InterruptedException {
@Cleanup
- PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 1);
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .statsInterval(1, TimeUnit.SECONDS)
+ .build();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic("persistent://public/default/" +
UUID.randomUUID().toString())
+ .topic(newTopicName())
.subscriptionName("test")
.isAckReceiptEnabled(ackReceiptEnabled)
.subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
index d64a2d8b7e4..e8a73aa5a32 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
@@ -25,26 +25,12 @@ import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class CustomMessageIdTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class CustomMessageIdTest extends SharedPulsarBaseTest {
@DataProvider
public static Object[][] enableBatching() {
@@ -56,7 +42,7 @@ public class CustomMessageIdTest extends ProducerConsumerBase
{
@Test
public void testSeek() throws Exception {
- final var topic = "persistent://my-property/my-ns/test-seek-" +
System.currentTimeMillis();
+ final var topic = newTopicName();
@Cleanup final var producer =
pulsarClient.newProducer(Schema.INT32).topic(topic).create();
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
for (int i = 0; i < 10; i++) {
@@ -72,8 +58,7 @@ public class CustomMessageIdTest extends ProducerConsumerBase
{
@Test(dataProvider = "enableBatching")
public void testAcknowledgment(boolean enableBatching) throws Exception {
- final var topic = "persistent://my-property/my-ns/test-ack-"
- + enableBatching + System.currentTimeMillis();
+ final var topic = newTopicName();
final var producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(enableBatching)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
index 6263193b0a4..04d6aaf39ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/FailoverSubscriptionTest.java
@@ -23,38 +23,24 @@ import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class FailoverSubscriptionTest extends ProducerConsumerBase {
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class FailoverSubscriptionTest extends SharedPulsarBaseTest {
@Test(timeOut = 30_000, invocationCount = 5)
public void testWaitingCursorsCountAfterSwitchingActiveConsumers() throws
Exception {
- final String tp =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+ final String tp = newTopicName();
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(tp);
admin.topics().createSubscription(tp, subscription,
MessageId.earliest);
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopic(tp, false).join().get();
+ PersistentTopic topic = (PersistentTopic) getTopic(tp,
false).join().get();
Map<String, ConsumerImpl<byte[]>> consumerMap = new HashMap<>();
ConsumerImpl<byte[]> firstConsumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(tp)
.subscriptionType(SubscriptionType.Failover).subscriptionName(subscription).subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
index 7b0edd314d0..928a23b0a4f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java
@@ -19,36 +19,22 @@
package org.apache.pulsar.client.api;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
-public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class NonPartitionedTopicExpectedTest extends SharedPulsarBaseTest {
@Test
public void testWhenNonPartitionedTopicExists() throws Exception {
- final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String topic = newTopicName();
admin.topics().createNonPartitionedTopic(topic);
ProducerBuilderImpl<String> producerBuilder =
(ProducerBuilderImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topic);
@@ -62,7 +48,7 @@ public class NonPartitionedTopicExpectedTest extends
ProducerConsumerBase {
@Test
public void testWhenPartitionedTopicExists() throws Exception {
- final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 2);
ProducerBuilderImpl<String> producerBuilder =
(ProducerBuilderImpl<String>)
pulsarClient.newProducer(Schema.STRING).topic(topic);
@@ -89,8 +75,8 @@ public class NonPartitionedTopicExpectedTest extends
ProducerConsumerBase {
@Test(dataProvider = "topicTypes")
public void testWhenTopicNotExists(TopicType topicType) throws Exception {
- final String namespace = "public/default";
- final String topic = BrokerTestUtil.newUniqueName("persistent://" +
namespace + "/tp");
+ final String namespace = getNamespace();
+ final String topic = newTopicName();
final TopicName topicName = TopicName.get(topic);
AutoTopicCreationOverride.Builder policyBuilder =
AutoTopicCreationOverride.builder()
.topicType(topicType.toString()).allowAutoTopicCreation(true);
@@ -106,9 +92,10 @@ public class NonPartitionedTopicExpectedTest extends
ProducerConsumerBase {
// Verify: create successfully.
Producer producer = producerBuilder.create();
// Verify: only create non-partitioned topic.
-
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
- .partitionedTopicExists(topicName));
-
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
+
Assert.assertFalse(SharedPulsarCluster.get().getPulsarService().getPulsarResources()
+
.getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName));
+
Assert.assertTrue(SharedPulsarCluster.get().getPulsarService().getNamespaceService()
+ .checkNonPartitionedTopicExists(topicName).join());
// cleanup.
producer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
index 69c012b7622..4cd9371cd9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PersistentTopicTerminateTest.java
@@ -25,34 +25,18 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
@Slf4j
-public class PersistentTopicTerminateTest extends ProducerConsumerBase {
-
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class PersistentTopicTerminateTest extends SharedPulsarBaseTest {
@Test
public void testRecoverAfterTerminate() throws Exception {
- final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String topicName = newTopicName();
final String subscriptionName = "s1";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
@@ -83,7 +67,7 @@ public class PersistentTopicTerminateTest extends
ProducerConsumerBase {
admin.topics().skipAllMessages(topicName, subscriptionName);
Awaitility.await().untilAsserted(() -> {
PersistentTopic persistentTopic =
- (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ (PersistentTopic) getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
CompletableFuture<Void> trimLedgersFuture = new
CompletableFuture<>();
ml.trimConsumedLedgersInBackground(trimLedgersFuture);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index f4daba3bf05..2a7fcb3eb17 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -19,36 +19,19 @@
package org.apache.pulsar.client.api;
import io.netty.util.HashedWheelTimer;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class ProducerCleanupTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
- isTcpLookup = true;
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ProducerCleanupTest extends SharedPulsarBaseTest {
@Test
public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws
PulsarClientException, InterruptedException {
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic("persistent://public/default/" +
UUID.randomUUID().toString())
+ .topic(newTopicName())
.sendTimeout(1, TimeUnit.SECONDS)
.create();
producer.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
index 0c470ae5875..2e17913599d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerQueueSizeTest.java
@@ -22,25 +22,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class ProducerQueueSizeTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ProducerQueueSizeTest extends SharedPulsarBaseTest {
@DataProvider(name = "matrix")
public Object[][] matrix() {
@@ -62,7 +48,7 @@ public class ProducerQueueSizeTest extends
ProducerConsumerBase {
@Cleanup
PulsarClient client = PulsarClient.builder()
- .serviceUrl(brokerUrl.toString())
+ .serviceUrl(getWebServiceUrl())
.memoryLimit(10, SizeUnit.KILO_BYTES)
.build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
index 327081bf1b9..dd468c80d37 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
@@ -26,34 +26,15 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class ReplicateSubscriptionTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- }
+public class ReplicateSubscriptionTest extends SharedPulsarBaseTest {
@DataProvider
public Object[] replicateSubscriptionState() {
@@ -67,7 +48,7 @@ public class ReplicateSubscriptionTest extends
ProducerConsumerBase {
@Test(dataProvider = "replicateSubscriptionState")
public void testReplicateSubscriptionState(Boolean
replicateSubscriptionState)
throws Exception {
- String topic = "persistent://my-property/my-ns/" + System.nanoTime();
+ String topic = newTopicName();
String subName = "sub-" + System.nanoTime();
ConsumerBuilder<String> consumerBuilder =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
@@ -79,7 +60,7 @@ public class ReplicateSubscriptionTest extends
ProducerConsumerBase {
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(),
replicateSubscriptionState);
@Cleanup
Consumer<String> ignored = consumerBuilder.subscribe();
- CompletableFuture<Optional<Topic>> topicIfExists =
pulsar.getBrokerService().getTopicIfExists(topic);
+ CompletableFuture<Optional<Topic>> topicIfExists =
getTopicIfExists(topic);
assertThat(topicIfExists)
.succeedsWithin(3, TimeUnit.SECONDS)
.matches(optionalTopic -> {