This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6bc233dc3 [cleanup] Convert 13 test classes to SharedPulsarBaseTest
(#25331)
7f6bc233dc3 is described below
commit 7f6bc233dc37d0bd981f9bfe3ccc104069ac39a4
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Mar 17 11:15:53 2026 -0700
[cleanup] Convert 13 test classes to SharedPulsarBaseTest (#25331)
---
.../impl/ConsumerUnsubscribeIntegrationTest.java | 24 +--
.../api/ExposeMessageRedeliveryCountTest.java | 24 +--
.../apache/pulsar/client/api/MemoryLimitTest.java | 22 +--
.../api/SimpleTypedProducerConsumerTest.java | 179 ++++++++++++---------
.../client/impl/CompactedOutBatchMessageTest.java | 21 +--
.../client/impl/ConsumeBaseExceptionTest.java | 23 +--
.../pulsar/client/impl/ConsumerCloseTest.java | 26 +--
.../impl/ConsumerDedupPermitsUpdateTest.java | 22 +--
.../client/impl/ConsumerMemoryLimitTest.java | 21 +--
.../client/impl/DispatchAccordingPermitsTest.java | 21 +--
.../impl/HierarchyTopicAutoCreationTest.java | 33 ++--
.../client/impl/ProduceWithMessageIdTest.java | 16 +-
.../pulsar/client/impl/TopicFromMessageTest.java | 42 +++--
13 files changed, 178 insertions(+), 296 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
index 547958de845..ef33a61139d 100644
---
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/ConsumerUnsubscribeIntegrationTest.java
@@ -21,38 +21,22 @@ package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-impl")
-public class ConsumerUnsubscribeIntegrationTest extends ProducerConsumerBase {
-
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerUnsubscribeIntegrationTest extends SharedPulsarBaseTest {
@Test
public void testUnSubscribeWhenCursorNotExists() throws Exception {
- final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String topic = newTopicName();
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
@@ -65,7 +49,7 @@ public class ConsumerUnsubscribeIntegrationTest extends
ProducerConsumerBase {
consumer.acknowledge(consumer.receive(2, TimeUnit.SECONDS));
PersistentTopic persistentTopic =
- (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ (PersistentTopic) getTopic(topic, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get(subscription);
Awaitility.await().untilAsserted(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
index a9e21f77aa1..0ddaad34e0d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ExposeMessageRedeliveryCountTest.java
@@ -21,32 +21,18 @@ package org.apache.pulsar.client.api;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class ExposeMessageRedeliveryCountTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ExposeMessageRedeliveryCountTest extends SharedPulsarBaseTest {
@Test(timeOut = 30000)
public void testRedeliveryCount() throws PulsarClientException {
- final String topic = "persistent://my-property/my-ns/redeliveryCount";
+ final String topic = newTopicName();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
@@ -81,7 +67,7 @@ public class ExposeMessageRedeliveryCountTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testRedeliveryCountWithPartitionedTopic() throws
PulsarClientException, PulsarAdminException {
- final String topic =
"persistent://my-property/my-ns/redeliveryCount.partitioned";
+ final String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, 3);
@@ -119,7 +105,7 @@ public class ExposeMessageRedeliveryCountTest extends
ProducerConsumerBase {
@Test(timeOut = 30000)
public void testRedeliveryCountWhenConsumerDisconnected() throws
PulsarClientException {
- String topic =
"persistent://my-property/my-ns/testRedeliveryCountWhenConsumerDisconnected";
+ String topic = newTopicName();
Consumer<String> consumer0 = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
index 60eb79e77bc..6ed5a1118b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MemoryLimitTest.java
@@ -23,17 +23,16 @@ import static org.testng.Assert.fail;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import
org.apache.pulsar.client.api.PulsarClientException.MemoryBufferIsFullError;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarTestClient;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class MemoryLimitTest extends ProducerConsumerBase {
+public class MemoryLimitTest extends SharedPulsarBaseTest {
@DataProvider(name = "batchingAndMemoryLimit")
public Object[][] provider() {
@@ -44,26 +43,13 @@ public class MemoryLimitTest extends ProducerConsumerBase {
};
}
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test(dataProvider = "batchingAndMemoryLimit")
public void testRejectMessages(boolean batching, int memoryLimit)
throws Exception {
String topic = newTopicName();
ClientBuilder clientBuilder = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
+ .serviceUrl(getBrokerServiceUrl())
.memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);
@Cleanup
@@ -120,7 +106,7 @@ public class MemoryLimitTest extends ProducerConsumerBase {
String t2 = newTopicName();
ClientBuilder clientBuilder = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
+ .serviceUrl(getBrokerServiceUrl())
.memoryLimit(memoryLimit, SizeUnit.KILO_BYTES);
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 293d3adcb87..d03f6746d1e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -32,7 +32,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -46,43 +47,37 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
-public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
+public class SimpleTypedProducerConsumerTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class);
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
+ private <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T
receivedMessage, T expectedMessage) {
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match the
expected message " + expectedMessage);
+ Assert.assertTrue(messagesReceived.add(receivedMessage), "Received
duplicate message " + receivedMessage);
}
@Test
public void testJsonProducerAndConsumer() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --", "testJsonProducerAndConsumer");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(jsonSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<JsonEncodedPojo> producer = pulsarClient
.newProducer(jsonSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -103,24 +98,28 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
consumer.acknowledgeCumulative(msg);
consumer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
jsonSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --", "testJsonProducerAndConsumer");
}
@Test
public void testJsonProducerAndConsumerWithPrestoredSchema() throws
Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --",
"testJsonProducerAndConsumerWithPrestoredSchema");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+ SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+ .putSchemaIfAbsent(schemaKey,
SchemaData.builder()
.type(SchemaType.JSON)
.isDeleted(false)
@@ -134,36 +133,40 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(jsonSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<JsonEncodedPojo> producer = pulsarClient
.newProducer(jsonSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
consumer.close();
producer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
jsonSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --",
"testJsonProducerAndConsumerWithPrestoredSchema");
}
@Test
public void testWrongCorruptedSchema() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --", "testWrongCorruptedSchema");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
byte[] randomSchemaBytes = "hello".getBytes();
try {
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+
SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+ .putSchemaIfAbsent(schemaKey,
SchemaData.builder()
.type(SchemaType.JSON)
.isDeleted(false)
@@ -179,25 +182,28 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
assertTrue(e.getCause() instanceof InvalidSchemaDataException);
}
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --", "testWrongCorruptedSchema");
}
@Test
public void testProtobufProducerAndConsumer() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --", "testProtobufProducerAndConsumer");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage>
protobufSchema =
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage>
consumer = pulsarClient
.newConsumer(protobufSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<org.apache.pulsar.client.api.schema.proto.Test.TestMessage>
producer = pulsarClient
.newProducer(protobufSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -222,24 +228,28 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
consumer.acknowledgeCumulative(msg);
consumer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
protobufSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --", "testProtobufProducerAndConsumer");
}
@Test(expectedExceptions = {PulsarClientException.class})
public void testProtobufConsumerWithWrongPrestoredSchema() throws
Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --",
"testProtobufConsumerWithWrongPrestoredSchema");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
ProtobufSchema<org.apache.pulsar.client.api.schema.proto.Test.TestMessage>
schema =
ProtobufSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessage.class);
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+ SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+ .putSchemaIfAbsent(schemaKey,
SchemaData.builder()
.type(SchemaType.PROTOBUF)
.isDeleted(false)
@@ -255,16 +265,19 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
.newConsumer(AvroSchema.of
(SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --",
"testProtobufConsumerWithWrongPrestoredSchema");
}
@Test
public void testAvroProducerAndConsumer() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --", "testAvroProducerAndConsumer");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -272,13 +285,13 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(avroSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -299,19 +312,23 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
consumer.acknowledgeCumulative(msg);
consumer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
avroSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --", "testAvroProducerAndConsumer");
}
@Test(expectedExceptions = {PulsarClientException.class})
public void testAvroConsumerWithWrongRestoredSchema() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --",
"testAvroConsumerWithWrongRestoredSchema");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
byte[] randomSchemaBytes = ("{\n"
+ " \"type\": \"record\",\n"
@@ -323,8 +340,8 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
+ " ]\n"
+ "} ").getBytes();
- pulsar.getSchemaRegistryService()
- .putSchemaIfAbsent("my-property/my-ns/my-topic1",
+ SharedPulsarCluster.get().getPulsarService().getSchemaRegistryService()
+ .putSchemaIfAbsent(schemaKey,
SchemaData.builder()
.type(SchemaType.AVRO)
.isDeleted(false)
@@ -338,11 +355,11 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).withAlwaysAllowNull(false).build()))
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --",
"testAvroConsumerWithWrongRestoredSchema");
}
public static class AvroEncodedPojo {
@@ -433,7 +450,10 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
@Test
public void testAvroProducerAndAutoSchemaConsumer() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --",
"testAvroProducerAndAutoSchemaConsumer");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -441,7 +461,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -451,7 +471,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Consumer<GenericRecord> consumer = pulsarClient
.newConsumer(Schema.AUTO_CONSUME())
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -470,19 +490,23 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
consumer.acknowledgeCumulative(msg);
consumer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
avroSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --",
"testAvroProducerAndAutoSchemaConsumer");
}
@Test
public void testAvroProducerAndAutoSchemaReader() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --",
"testAvroProducerAndAutoSchemaReader");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -490,7 +514,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -500,7 +524,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Reader<GenericRecord> reader = pulsarClient
.newReader(Schema.AUTO_CONSUME())
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.startMessageId(MessageId.earliest)
.create();
@@ -517,19 +541,23 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
// Acknowledge the consumption of all messages at once
reader.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
avroSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --",
"testAvroProducerAndAutoSchemaReader");
}
@Test
public void testAutoBytesProducer() throws Exception {
- log.info("-- Starting {} test --", methodName);
+ log.info("-- Starting {} test --", "testAutoBytesProducer");
+
+ final String topic = newTopicName();
+ final String schemaKey = topic.replace("persistent://", "");
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
@@ -537,7 +565,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
try (Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create()) {
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
@@ -547,7 +575,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
try (Producer<byte[]> producer = pulsarClient
.newProducer(Schema.AUTO_PRODUCE_BYTES())
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create()) {
// try to produce junk data
for (int i = 10; i < 20; i++) {
@@ -572,7 +600,7 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
Consumer<GenericRecord> consumer = pulsarClient
.newConsumer(Schema.AUTO_CONSUME())
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.subscriptionName("my-subscriber-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -591,19 +619,20 @@ public class SimpleTypedProducerConsumerTest extends
ProducerConsumerBase {
consumer.acknowledgeCumulative(msg);
consumer.close();
- SchemaRegistry.SchemaAndMetadata storedSchema =
pulsar.getSchemaRegistryService()
- .getSchema("my-property/my-ns/my-topic1")
+ SchemaRegistry.SchemaAndMetadata storedSchema =
SharedPulsarCluster.get().getPulsarService()
+ .getSchemaRegistryService()
+ .getSchema(schemaKey)
.get();
Assert.assertEquals(storedSchema.schema.getData(),
avroSchema.getSchemaInfo().getSchema());
- log.info("-- Exiting {} test --", methodName);
+ log.info("-- Exiting {} test --", "testAutoBytesProducer");
}
@Test
public void testMessageBuilderLoadConf() throws Exception {
- String topic = BrokerTestUtil.newUniqueName("my-topic");
+ String topic = newTopicName();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 0eff19d31a3..56bf88c6429 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -22,36 +22,21 @@ import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class CompactedOutBatchMessageTest extends SharedPulsarBaseTest {
@Test
public void testCompactedOutMessages() throws Exception {
- final String topic1 = "persistent://my-property/my-ns/my-topic";
+ final String topic1 = newTopicName();
BrokerEntryMetadata brokerEntryMetadata = new
BrokerEntryMetadata().setBrokerTimestamp(1).setBrokerTimestamp(1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
index ef1c993642b..e64871f9d75 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java
@@ -18,33 +18,18 @@
*/
package org.apache.pulsar.client.impl;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class ConsumeBaseExceptionTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumeBaseExceptionTest extends SharedPulsarBaseTest {
@Test
public void testClosedConsumer() throws PulsarClientException {
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(newTopicName())
.subscriptionName("my-subscription").subscribe();
consumer.close();
Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
@@ -62,7 +47,7 @@ public class ConsumeBaseExceptionTest extends
ProducerConsumerBase {
@Test
public void testListener() throws PulsarClientException {
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/topicName")
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(newTopicName())
.subscriptionName("my-subscription").messageListener((consumer1, msg) -> {
}).subscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
index b9355d19c27..6d29e30f46b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
@@ -21,36 +21,20 @@ package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.awaitility.Awaitility;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
-public class ConsumerCloseTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerCloseTest extends SharedPulsarBaseTest {
@Test
public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
- String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ String tpName = newTopicName();
String subName = "test-sub";
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subName, MessageId.earliest);
@@ -65,10 +49,10 @@ public class ConsumerCloseTest extends ProducerConsumerBase
{
@Test
public void testReceiveWillDoneAfterTopicDeleted() throws Exception {
- String namespace = "public/default";
+ String namespace = getNamespace();
admin.namespaces().setAutoTopicCreation(namespace,
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(false).build());
- String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ String tpName = newTopicName();
String subName = "test-sub";
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subName, MessageId.earliest);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 612af814d3a..d5fd0bf1a18 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -22,32 +22,16 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class ConsumerDedupPermitsUpdateTest extends ProducerConsumerBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerDedupPermitsUpdateTest extends SharedPulsarBaseTest {
@DataProvider(name = "combinations")
public Object[][] combinations() {
@@ -65,7 +49,7 @@ public class ConsumerDedupPermitsUpdateTest extends
ProducerConsumerBase {
@Test(timeOut = 30000, dataProvider = "combinations")
public void testConsumerDedup(boolean batchingEnabled, int
receiverQueueSize) throws Exception {
- String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
+ String topic = newTopicName();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
index cf647eeda2e..f3bf80a646c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -20,40 +20,25 @@ package org.apache.pulsar.client.impl;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@Slf4j
-public class ConsumerMemoryLimitTest extends ProducerConsumerBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ConsumerMemoryLimitTest extends SharedPulsarBaseTest {
@Test
public void testConsumerMemoryLimit() throws Exception {
String topic = newTopicName();
ClientBuilder clientBuilder = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
+ .serviceUrl(getBrokerServiceUrl())
.memoryLimit(10, SizeUnit.KILO_BYTES);
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
index 878a368e473..f989a60d36c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/DispatchAccordingPermitsTest.java
@@ -19,36 +19,21 @@
package org.apache.pulsar.client.impl;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class DispatchAccordingPermitsTest extends ProducerConsumerBase {
-
- @Override
- @BeforeMethod
- public void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class DispatchAccordingPermitsTest extends SharedPulsarBaseTest {
/**
* The test case is to simulate dispatch batches with different batch size
to the consumer.
@@ -59,7 +44,7 @@ public class DispatchAccordingPermitsTest extends
ProducerConsumerBase {
*/
@Test
public void testFlowPermitsWithMultiBatchesDispatch() throws
PulsarAdminException, PulsarClientException {
- final String topic =
"persistent://public/default/testFlowPermitsWithMultiBatchesDispatch";
+ final String topic = newTopicName();
final String subName = "test";
admin.topics().createSubscription(topic, "test", MessageId.earliest);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
index f7e6f1c60d2..133cf1ad28f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/HierarchyTopicAutoCreationTest.java
@@ -19,44 +19,27 @@
package org.apache.pulsar.client.impl;
import java.util.List;
-import java.util.UUID;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@Slf4j
-public class HierarchyTopicAutoCreationTest extends ProducerConsumerBase {
-
- @Override
- @BeforeMethod
- protected void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class HierarchyTopicAutoCreationTest extends SharedPulsarBaseTest {
@Test(invocationCount = 3)
@SneakyThrows
public void testPartitionedTopicAutoCreation() {
- // Create namespace
- final String namespace = "public/testPartitionedTopicAutoCreation";
- admin.namespaces().createNamespace(namespace);
+ final String namespace = getNamespace();
// Set policies
final AutoTopicCreationOverride expectedPolicies =
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
@@ -69,7 +52,8 @@ public class HierarchyTopicAutoCreationTest extends
ProducerConsumerBase {
.getAutoTopicCreation(namespace);
Assert.assertEquals(nsAutoTopicCreationOverride, expectedPolicies);
// Background invalidate cache
- final MetadataCache<Policies> nsCache =
pulsar.getPulsarResources().getNamespaceResources().getCache();
+ final MetadataCache<Policies> nsCache =
SharedPulsarCluster.get().getPulsarService()
+ .getPulsarResources().getNamespaceResources().getCache();
@Cleanup("interrupt")
final Thread t1 = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
@@ -79,7 +63,7 @@ public class HierarchyTopicAutoCreationTest extends
ProducerConsumerBase {
t1.start();
// trigger auto-creation
- final String topicName = "persistent://" + namespace + "/test-" +
UUID.randomUUID();
+ final String topicName = newTopicName();
@Cleanup final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@@ -89,7 +73,8 @@ public class HierarchyTopicAutoCreationTest extends
ProducerConsumerBase {
TopicName.get(topicName).getPartition(0).toString()); //
expect partitioned topic
// double-check policies
- final AutoTopicCreationOverride actualPolicies2 =
admin.namespaces().getAutoTopicCreation(namespace);
+ final AutoTopicCreationOverride actualPolicies2 = admin.namespaces()
+ .getAutoTopicCreation(namespace);
Assert.assertEquals(actualPolicies2, expectedPolicies);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
index bb69c0daefc..ff72f23b0a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MockBrokerService;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -42,25 +42,21 @@ import org.testng.annotations.Test;
@Test(groups = "broker-impl")
@Slf4j
-public class ProduceWithMessageIdTest extends ProducerConsumerBase {
+public class ProduceWithMessageIdTest extends SharedPulsarBaseTest {
MockBrokerService mockBrokerService;
@BeforeClass(alwaysRun = true)
- public void setup() throws Exception {
+ public void setupMockBroker() throws Exception {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
- super.internalSetup();
- super.producerBaseSetup();
}
- @Override
@AfterClass(alwaysRun = true)
- public void cleanup() throws Exception {
+ public void cleanupMockBroker() throws Exception {
if (mockBrokerService != null) {
mockBrokerService.stop();
mockBrokerService = null;
}
- super.internalCleanup();
}
@Test
@@ -81,7 +77,7 @@ public class ProduceWithMessageIdTest extends
ProducerConsumerBase {
.serviceUrl(mockBrokerService.getBrokerAddress())
.build();
- String topic = "persistent://public/default/t1";
+ String topic = newTopicName();
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>)
client.newProducer().topic(topic).enableBatching(false).create();
@@ -129,7 +125,7 @@ public class ProduceWithMessageIdTest extends
ProducerConsumerBase {
int batchSize = 10;
- String topic = "persistent://public/default/testSendWithCallBack";
+ String topic = newTopicName();
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topic)
.enableBatching(true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
index 2856de00bf6..68f0f6efedb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicFromMessageTest.java
@@ -21,31 +21,38 @@ package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
import java.util.HashSet;
import java.util.Set;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.apache.pulsar.broker.service.SharedPulsarCluster;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class TopicFromMessageTest extends ProducerConsumerBase {
+public class TopicFromMessageTest extends SharedPulsarBaseTest {
private static final long TEST_TIMEOUT = 90000; // 1.5 min
private static final int BATCHING_MAX_MESSAGES_THRESHOLD = 2;
@Override
- @BeforeMethod
- public void setup() throws Exception {
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
+ @BeforeClass(alwaysRun = true)
+ public void setupSharedCluster() throws Exception {
+ super.setupSharedCluster();
+ // These tests use short topic names (e.g. "topic1") which resolve to
public/default
+ try {
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Set.of(),
Set.of(SharedPulsarCluster.CLUSTER_NAME)));
+ } catch (Exception e) {
+ // tenant may already exist
+ }
+ try {
+ admin.namespaces().createNamespace("public/default",
+ Set.of(SharedPulsarCluster.CLUSTER_NAME));
+ } catch (Exception e) {
+ // namespace may already exist
+ }
}
@Test(timeOut = TEST_TIMEOUT)
@@ -61,12 +68,13 @@ public class TopicFromMessageTest extends
ProducerConsumerBase {
@Test(timeOut = TEST_TIMEOUT)
public void testSingleTopicConsumerNoBatchFullName() throws Exception {
+ final String topic = newTopicName();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
-
.topic("my-property/my-ns/topic1").subscriptionName("sub1").subscribe();
+ .topic(topic).subscriptionName("sub1").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
-
.topic("my-property/my-ns/topic1").enableBatching(false).create()) {
+ .topic(topic).enableBatching(false).create()) {
producer.send("foobar".getBytes());
- Assert.assertEquals(consumer.receive().getTopicName(),
"persistent://my-property/my-ns/topic1");
+ Assert.assertEquals(consumer.receive().getTopicName(), topic);
}
}