This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ddc94993fd [cleanup] Convert 15 test classes to SharedPulsarBaseTest
(#25318)
5ddc94993fd is described below
commit 5ddc94993fd0c437286b95f3e224bf52c20c5c35
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 16 08:56:34 2026 -0700
[cleanup] Convert 15 test classes to SharedPulsarBaseTest (#25318)
---
.../service/BatchMessageBrokerRestartTest.java | 120 +++++++++
.../pulsar/broker/service/BatchMessageTest.java | 279 ++++++---------------
.../BatchMessageWithBatchIndexLevelTest.java | 10 +-
.../BrokerServiceBundlesCacheInvalidationTest.java | 28 +--
.../broker/service/ConsumedLedgersTrimTest.java | 110 ++++----
.../service/CurrentLedgerRolloverIfFullTest.java | 22 +-
.../broker/service/ExclusiveProducerTest.java | 42 ++--
.../apache/pulsar/broker/service/KeyValueTest.java | 29 +--
.../pulsar/broker/service/PartitionKeyTest.java | 24 +-
.../pulsar/broker/service/ResendRequestTest.java | 129 ++++------
.../broker/service/SharedPulsarBaseTest.java | 33 ++-
.../pulsar/broker/service/SharedPulsarCluster.java | 8 +-
.../broker/service/TopicTerminationTest.java | 42 ++--
.../broker/service/persistent/ChecksumTest.java | 24 +-
.../PartitionKeywordCompatibilityTest.java | 41 ++-
.../broker/service/persistent/ShadowTopicTest.java | 61 ++---
.../pulsar/client/impl/MessageChecksumTest.java | 79 ++----
.../impl/UnAcknowledgedMessagesTimeoutTest.java | 93 +++----
.../pulsar/client/impl/ZeroQueueSizeTest.java | 94 +++----
19 files changed, 513 insertions(+), 755 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
new file mode 100644
index 00000000000..eb032257930
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageBrokerRestartTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests batch message behavior across broker restarts.
+ * This test requires stopping and starting the broker, so it cannot use
SharedPulsarCluster.
+ */
+@Test(groups = "broker")
+public class BatchMessageBrokerRestartTest extends BrokerTestBase {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "containerBuilder")
+ public Object[][] containerBuilderProvider() {
+ return new Object[][] {
+ { BatcherBuilder.DEFAULT },
+ { BatcherBuilder.KEY_BASED }
+ };
+ }
+
+ @Test(dataProvider = "containerBuilder")
+ public void
testSimpleBatchProducerWithStoppingAndStartingBroker(BatcherBuilder builder)
throws Exception {
+ // Send enough messages to trigger one batch by size and then have a
remaining message in the batch container
+ int numMsgs = 3;
+ int numMsgsInBatch = 2;
+ final String topicName =
"persistent://prop/ns-abc/testBatchBrokerRestart-" + System.nanoTime();
+ final String subscriptionName = "syncsub-1";
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ consumer.close();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+ .batchingMaxMessages(numMsgsInBatch)
+ .enableBatching(true)
+ .batcherBuilder(builder)
+ .create();
+
+ stopBroker();
+
+ List<CompletableFuture<MessageId>> messages = new ArrayList<>();
+ for (int i = 0; i < numMsgs; i++) {
+ byte[] message = ("my-message-" + i).getBytes();
+ messages.add(producer.sendAsync(message));
+ }
+
+ startBroker();
+
+ // Fail if any one message fails to get acknowledged
+ FutureUtil.waitForAll(messages).get(30, TimeUnit.SECONDS);
+
+ Awaitility.await().timeout(30, TimeUnit.SECONDS)
+ .until(() ->
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+
+ rolloverPerIntervalStats();
+
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+ for (int i = 0; i < numMsgs; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ String receivedMessage = new String(msg.getData());
+ String expectedMessage = "my-message-" + i;
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match
the expected message " + expectedMessage);
+ }
+ consumer.close();
+ producer.close();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 05de288639d..860435fdd9e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -37,9 +37,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
-import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
@@ -53,32 +50,22 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class BatchMessageTest extends BrokerTestBase {
+public class BatchMessageTest extends SharedPulsarBaseTest {
- private static final Logger log =
LoggerFactory.getLogger(BatchMessageTest.class);
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchMessageTest.class);
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
+ private long getBacklog(String topicName, String subscriptionName) throws
Exception {
+ return admin.topics().getStats(topicName)
+ .getSubscriptions().get(subscriptionName).getMsgBacklog();
}
@DataProvider(name = "codecAndContainerBuilder")
@@ -115,8 +102,7 @@ public class BatchMessageTest extends BrokerTestBase {
throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1" + compressionType.toString();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -137,13 +123,9 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- // we expect 2 messages in the backlog since we sent 50 messages with
the batch size set to 25. We have set the
- // batch time high enough for it to not affect the number of messages
in the batch
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ // we expect 2 entries in the backlog since we sent 50 messages with
the batch size set to 25.
+ // We have set the batch time high enough for it to not affect the
number of messages in the batch
+ assertEquals(getBacklog(topicName, subscriptionName), 2);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -163,8 +145,7 @@ public class BatchMessageTest extends BrokerTestBase {
throws Exception {
int numMsgs = 50;
int numBytesInBatch = 600;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSize-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1" + compressionType.toString();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -188,13 +169,8 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- // we expect 2 messages in the backlog since we sent 50 messages with
the batch size set to 25. We have set the
- // batch time high enough for it to not affect the number of messages
in the batch
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ // we expect 2 entries in the backlog since we sent 50 messages with
the batch size set to 25.
+ assertEquals(getBacklog(topicName, subscriptionName), 2);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -213,8 +189,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testSimpleBatchProducerWithFixedBatchTime(CompressionType
compressionType, BatcherBuilder builder)
throws Exception {
int numMsgs = 100;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchTime-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "time-sub-1" +
compressionType.toString();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -236,13 +211,9 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
-
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
-
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)
< numMsgs);
+ long backlog = getBacklog(topicName, subscriptionName);
+ LOG.info("Sent {} messages, backlog is {} entries", numMsgs, backlog);
+ assertTrue(backlog < numMsgs);
producer.close();
}
@@ -251,8 +222,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void
testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType
compressionType,
BatcherBuilder builder) throws Exception {
int numMsgs = 100;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerWithFixedBatchSizeAndTime-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "time-size-sub-1" +
compressionType.toString();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -274,13 +244,9 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
-
topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
-
assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false)
< numMsgs);
+ long backlog2 = getBacklog(topicName, subscriptionName);
+ LOG.info("Sent {} messages, backlog is {} entries", numMsgs, backlog2);
+ assertTrue(backlog2 < numMsgs);
producer.close();
}
@@ -290,7 +256,7 @@ public class BatchMessageTest extends BrokerTestBase {
throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 2;
- final String topicName =
"persistent://prop/ns-abc/testBatchProducerWithLargeMessage-" +
UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "large-message-sub-1" +
compressionType.toString();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
@@ -322,13 +288,9 @@ public class BatchMessageTest extends BrokerTestBase {
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- // we expect 3 messages in the backlog since the large message in the
middle should
+ // we expect 3 entries in the backlog since the large message in the
middle should
// close out the batch and be sent in a batch of its own
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
3);
+ assertEquals(getBacklog(topicName, subscriptionName), 3);
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
@@ -341,8 +303,8 @@ public class BatchMessageTest extends BrokerTestBase {
LOG.info("received msg size: {}", msg.getData().length);
consumer.acknowledge(msg);
}
- Thread.sleep(100);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
0);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
}
@@ -352,7 +314,7 @@ public class BatchMessageTest extends BrokerTestBase {
throws Exception {
int numMsgs = 500;
int numMsgsInBatch = numMsgs / 20;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerConsumer-" + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "pc-sub-1" +
compressionType.toString();
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -381,12 +343,7 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
- numMsgs / numMsgsInBatch);
+ assertEquals(getBacklog(topicName, subscriptionName), numMsgs /
numMsgsInBatch);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
@@ -402,8 +359,8 @@ public class BatchMessageTest extends BrokerTestBase {
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
- Thread.sleep(100);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
0);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
}
@@ -412,8 +369,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder
builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs / 2;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "syncsub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -431,67 +387,9 @@ public class BatchMessageTest extends BrokerTestBase {
producer.send(message);
}
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
- // we would expect 2 messages in the backlog since we sent 10 messages
with the batch size set to 5.
+ // we would expect 2 entries in the backlog since we sent 10 messages
with the batch size set to 5.
// However, we are using synchronous send and so each message will go
as an individual message
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
10);
- consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
-
- for (int i = 0; i < numMsgs; i++) {
- Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
- assertNotNull(msg);
- String receivedMessage = new String(msg.getData());
- String expectedMessage = "my-message-" + i;
- Assert.assertEquals(receivedMessage, expectedMessage,
- "Received message " + receivedMessage + " did not match
the expected message " + expectedMessage);
- }
- consumer.close();
- producer.close();
- }
-
- @Test(dataProvider = "containerBuilder")
- public void
testSimpleBatchProducerWithStoppingAndStartingBroker(BatcherBuilder builder)
throws Exception {
- // Send enough messages to trigger one batch by size and then have a
remaining message in the batch container
- int numMsgs = 3;
- int numMsgsInBatch = 2;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchSyncProducerWithFixedBatchSize-"
- + UUID.randomUUID();
- final String subscriptionName = "syncsub-1";
-
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
- .subscribe();
- consumer.close();
-
- Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
- .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
- .batchingMaxMessages(numMsgsInBatch)
- .enableBatching(true)
- .batcherBuilder(builder)
- .create();
-
- stopBroker();
-
- List<CompletableFuture<MessageId>> messages = new ArrayList<>();
- for (int i = 0; i < numMsgs; i++) {
- byte[] message = ("my-message-" + i).getBytes();
- messages.add(producer.sendAsync(message));
- }
-
- startBroker();
-
- // Fail if any one message fails to get acknowledged
- FutureUtil.waitForAll(messages).get(30, TimeUnit.SECONDS);
-
- Awaitility.await().timeout(30, TimeUnit.SECONDS)
- .until(() ->
pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ assertEquals(getBacklog(topicName, subscriptionName), 10);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -510,8 +408,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder
builder) throws Exception {
int numMsgs = 2000;
int numMsgsInBatch = 4;
- final String topicName =
"persistent://prop/ns-abc/testSimpleBatchProducerConsumer1kMessages-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "pc1k-sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -542,13 +439,7 @@ public class BatchMessageTest extends BrokerTestBase {
}
LOG.info("[{}] sent {} messages", subscriptionName, numMsgs);
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- // allow stats to be updated..
- LOG.info("[{}] checking backlog stats..", topic);
- rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName)
- .getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
+ assertEquals(getBacklog(topicName, subscriptionName), numMsgs /
numMsgsInBatch);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
@@ -561,9 +452,10 @@ public class BatchMessageTest extends BrokerTestBase {
consumer.acknowledgeCumulative(lastunackedMsg);
}
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
0);
}
// test for ack holes
@@ -576,7 +468,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testOutOfOrderAcksForBatchMessage() throws Exception {
int numMsgs = 40;
int numMsgsInBatch = numMsgs / 4;
- final String topicName =
"persistent://prop/ns-abc/testOutOfOrderAcksForBatchMessage";
+ final String topicName = newTopicName();
final String subscriptionName = "oooack-sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -595,11 +487,7 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName)
- .getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
+ assertEquals(getBacklog(topicName, subscriptionName), numMsgs /
numMsgsInBatch);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Set<Integer> individualAcks = new HashSet<>();
for (int i = 15; i < 20; i++) {
@@ -617,24 +505,21 @@ public class BatchMessageTest extends BrokerTestBase {
} else if (i == 14) {
// should ack lid =0 eid = 1 on broker
consumer.acknowledgeCumulative(msg);
- Thread.sleep(1000);
- rolloverPerIntervalStats();
- Thread.sleep(1000);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
3);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName),
3));
} else if (individualAcks.contains(i)) {
consumer.acknowledge(msg);
} else {
lastunackedMsg = msg;
}
}
- Thread.sleep(1000);
- rolloverPerIntervalStats();
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 2));
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
- Thread.sleep(100);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
0);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
}
@@ -643,8 +528,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder
builder) throws Exception {
int numMsgs = 10;
int numMsgsInBatch = numMsgs;
- final String topicName =
"persistent://prop/ns-abc/testNonBatchCumulativeAckAfterBatchPublish-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "nbcaabp-sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -671,11 +555,7 @@ public class BatchMessageTest extends BrokerTestBase {
byte[] nobatchmsg = ("nobatch").getBytes();
noBatchProducer.sendAsync(nobatchmsg).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
2);
+ assertEquals(getBacklog(topicName, subscriptionName), 2);
consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message<byte[]> lastunackedMsg = null;
@@ -685,9 +565,8 @@ public class BatchMessageTest extends BrokerTestBase {
lastunackedMsg = msg;
}
consumer.acknowledgeCumulative(lastunackedMsg);
- Thread.sleep(100);
- rolloverPerIntervalStats();
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
0);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
noBatchProducer.close();
@@ -697,7 +576,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder)
throws Exception {
int numMsgs = 50;
int numMsgsInBatch = numMsgs / 10;
- final String topicName =
"persistent://prop/ns-abc/testBatchAndNonBatchCumulativeAcks-" +
UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "bnb-sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -726,11 +605,7 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
- rolloverPerIntervalStats();
-
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn
> 0.0);
-
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
+ assertEquals(getBacklog(topicName, subscriptionName),
(numMsgs / 2) / numMsgsInBatch + numMsgs / 2);
consumer = pulsarClient.newConsumer()
.topic(topicName)
@@ -753,8 +628,8 @@ public class BatchMessageTest extends BrokerTestBase {
}
consumer.acknowledgeCumulative(lastunackedMsg);
- retryStrategically(t -> topic.getSubscription(subscriptionName)
- .getNumberOfEntriesInBacklog(false) == 0, 100, 100);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(getBacklog(topicName, subscriptionName), 0));
consumer.close();
producer.close();
@@ -769,7 +644,7 @@ public class BatchMessageTest extends BrokerTestBase {
@Test(dataProvider = "containerBuilder")
public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws
Exception {
int numMsgs = 10;
- final String topicName = "persistent://prop/ns-abc/testConcurrentAck-"
+ UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -789,11 +664,8 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
-
final Consumer<byte[]> myConsumer =
pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe();
- // assertEquals(dispatcher.getTotalUnackedMessages(), 1);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
@@ -812,11 +684,12 @@ public class BatchMessageTest extends BrokerTestBase {
}
latch.await();
- AbstractPersistentDispatcherMultipleConsumers dispatcher =
(AbstractPersistentDispatcherMultipleConsumers) topic
- .getSubscription(subscriptionName).getDispatcher();
// check strategically to let ack-message receive by broker
- retryStrategically((test) ->
dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150);
- assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
+ Awaitility.await().untilAsserted(() -> {
+ long unacked =
admin.topics().getStats(topicName).getSubscriptions()
+
.get(subscriptionName).getConsumers().get(0).getUnackedMessages();
+ assertEquals(unacked, 0);
+ });
executor.shutdownNow();
myConsumer.close();
@@ -826,7 +699,7 @@ public class BatchMessageTest extends BrokerTestBase {
@Test
public void testOrderingOfKeyBasedBatchMessageContainer()
throws PulsarClientException, ExecutionException,
InterruptedException {
- final String topicName = "persistent://prop/ns-abc/testKeyBased";
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
@@ -892,7 +765,7 @@ public class BatchMessageTest extends BrokerTestBase {
@Test(dataProvider = "containerBuilder")
public void testBatchSendOneMessage(BatcherBuilder builder) throws
Exception {
- final String topicName =
"persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -926,7 +799,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws
Exception {
int numMsgs = 10;
- final String topicName =
"persistent://prop/ns-abc/testRetrieveSequenceIdGenerated-" + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -959,7 +832,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testRetrieveSequenceIdSpecify(BatcherBuilder builder) throws
Exception {
int numMsgs = 10;
- final String topicName =
"persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
@@ -992,7 +865,7 @@ public class BatchMessageTest extends BrokerTestBase {
public void testSendOverSizeMessage(CompressionType compressionType,
BatcherBuilder builder) throws Exception {
final int numMsgs = 10;
- final String topicName =
"persistent://prop/ns-abc/testSendOverSizeMessage-" + UUID.randomUUID();
+ final String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
@@ -1021,8 +894,7 @@ public class BatchMessageTest extends BrokerTestBase {
int numMsgs = 1000;
int batchMessages = 10;
- final String topicName =
"persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-"
- + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "bmdap-sub-1";
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
@@ -1059,7 +931,7 @@ public class BatchMessageTest extends BrokerTestBase {
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType
subType,
boolean
enableBatch) throws Exception {
final int messageCount = 50;
- final String topicName =
"persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
+ final String topicName = newTopicName();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
@@ -1081,7 +953,7 @@ public class BatchMessageTest extends BrokerTestBase {
CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> {
- log.info("Published message with msgId: {}", msgId);
+ LOG.info("Published message with msgId: {}", msgId);
countDownLatch.countDown();
});
// To generate batch message with different batch size
@@ -1111,26 +983,19 @@ public class BatchMessageTest extends BrokerTestBase {
}
}
- String topic = TopicName.get(topicName).toString();
- PersistentSubscription persistentSubscription =
(PersistentSubscription) pulsar.getBrokerService()
- .getTopic(topic,
false).get().get().getSubscription(subscriptionName);
-
Awaitility.await().untilAsserted(() -> {
+ long unacked =
admin.topics().getStats(topicName).getSubscriptions()
+
.get(subscriptionName).getConsumers().get(0).getUnackedMessages();
if (subType == SubscriptionType.Shared) {
if (enableBatch) {
- if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) {
-
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(),
5 * 1);
- } else {
-
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(),
5 * 3);
- }
+ // acknowledgmentAtBatchIndexLevelEnabled defaults to true
+ assertEquals(unacked, 5 * 1);
} else {
-
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(),
messageCount / 2);
+ assertEquals(unacked, messageCount / 2);
}
} else {
-
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(),
0);
+ assertEquals(unacked, 0);
}
});
}
-
- private static final Logger LOG =
LoggerFactory.getLogger(BatchMessageTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 417b87acb09..faa49c4c930 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -58,20 +58,28 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
-public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
+public class BatchMessageWithBatchIndexLevelTest extends BrokerTestBase {
@BeforeClass
@Override
protected void setup() throws Exception {
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
super.baseSetup();
}
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
@Test
@SneakyThrows
public void testBatchMessageAck() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
index 1144c989819..1d90a2c49fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceBundlesCacheInvalidationTest.java
@@ -23,32 +23,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.BundlesData;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
-public class BrokerServiceBundlesCacheInvalidationTest extends BrokerTestBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class BrokerServiceBundlesCacheInvalidationTest extends
SharedPulsarBaseTest {
@Test
public void testRecreateNamespace() throws Exception {
- String namespace = "prop/test-" + System.nanoTime();
- String topic = namespace + "/my-topic";
-
- // First create namespace with 20 bundles
- admin.namespaces().createNamespace(namespace, 20);
+ String namespace = getNamespace();
+ String topic = "persistent://" + namespace + "/my-topic";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
@@ -59,11 +42,12 @@ public class BrokerServiceBundlesCacheInvalidationTest
extends BrokerTestBase {
// Delete and recreate with 32 bundles
admin.topics().delete(topic);
- deleteNamespaceWithRetry(namespace, false);
+ admin.namespaces().deleteNamespace(namespace, true);
+
admin.namespaces().createNamespace(namespace, 32);
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
log.info("BUNDLES: {}", admin.namespaces().getBundles(namespace));
assertEquals(bundlesData.getNumBundles(), 32);
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index 6f47c2c16b9..4548392377f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -36,41 +36,24 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ConsumedLedgersTrimTest extends BrokerTestBase {
+public class ConsumedLedgersTrimTest extends SharedPulsarBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(ConsumedLedgersTrimTest.class);
- @Override
- protected void setup() throws Exception {
- //No-op
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- super.conf.setDefaultRetentionSizeInMB(-1);
- super.conf.setDefaultRetentionTimeInMinutes(-1);
- }
-
@Test
public void testConsumedLedgersTrim() throws Exception {
- conf.setRetentionCheckIntervalInSeconds(1);
- super.baseSetup();
- final String topicName =
"persistent://prop/ns-abc/TestConsumedLedgersTrim";
+ // Set infinite retention at namespace level so ledgers are preserved
until explicitly trimmed
+ admin.namespaces().setRetention(getNamespace(), new
RetentionPolicies(-1, -1));
+
+ final String topicName = newTopicName();
final String subscriptionName = "my-subscriber-name";
@Cleanup
@@ -81,9 +64,9 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
@Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
- Topic topicRef =
pulsar.getBrokerService().getTopicReference(topicName).get();
+ Topic topicRef = getTopicReference(topicName).get();
Assert.assertNotNull(topicRef);
- PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ PersistentTopic persistentTopic = (PersistentTopic)
getTopic(topicName, true).get().get();
ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(1L);
@@ -101,7 +84,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() -
1, msgNum / 2);
});
- //no traffic, unconsumed ledger will be retained
+ // no traffic, unconsumed ledger will be retained
Thread.sleep(1200);
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size() - 1,
msgNum / 2);
@@ -111,18 +94,22 @@ public class ConsumedLedgersTrimTest extends
BrokerTestBase {
consumer.acknowledge(msg);
}
- //no traffic, but consumed ledger will be cleaned
+ // Explicitly trigger trim instead of relying on the broker's
retention check timer
Thread.sleep(1500);
+ CompletableFuture<Void> trimFuture = new CompletableFuture<>();
+ managedLedger.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.join();
+
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
}
@Test
public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
- conf.setRetentionCheckIntervalInSeconds(1);
- conf.setBrokerDeleteInactiveTopicsEnabled(false);
- super.baseSetup();
- final String topicName =
"persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions";
+ // Set infinite retention at namespace level so ledgers are preserved
until explicitly trimmed
+ admin.namespaces().setRetention(getNamespace(), new
RetentionPolicies(-1, -1));
+
+ final String topicName = newTopicName();
// write some messages
@Cleanup
@@ -133,7 +120,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase
{
// set retention parameters, the ledgers are to be deleted as soon as
possible
// but the topic is not to be automatically deleted
- PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ PersistentTopic persistentTopic = (PersistentTopic)
getTopic(topicName, true).get().get();
ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(-1, TimeUnit.SECONDS);
@@ -149,20 +136,22 @@ public class ConsumedLedgersTrimTest extends
BrokerTestBase {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- MessageId messageIdBeforeRestart =
pulsar.getAdminClient().topics().getLastMessageId(topicName);
- LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart);
- assertNotEquals(messageIdBeforeRestart, initialMessageId);
-
- // restart the broker we have to start a new ledger
- // the lastMessageId is still on the previous ledger
- restartBroker();
- // force load topic
- pulsar.getAdminClient().topics().getStats(topicName);
- MessageId messageIdAfterRestart =
pulsar.getAdminClient().topics().getLastMessageId(topicName);
- LOG.info("lastmessageid " + messageIdAfterRestart);
- assertEquals(messageIdAfterRestart, messageIdBeforeRestart);
-
- persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ MessageId messageIdBeforeUnload =
admin.topics().getLastMessageId(topicName);
+ LOG.info("messageIdBeforeUnload " + messageIdBeforeUnload);
+ assertNotEquals(messageIdBeforeUnload, initialMessageId);
+
+ // Unload and close the producer to force a new ledger when the topic
is reloaded.
+ // The lastMessageId should still refer to the previous ledger.
+ producer.close();
+ admin.topics().unload(topicName);
+
+ // Force-load the topic again
+ admin.topics().getStats(topicName);
+ MessageId messageIdAfterUnload =
admin.topics().getLastMessageId(topicName);
+ LOG.info("lastmessageid " + messageIdAfterUnload);
+ assertEquals(messageIdAfterUnload, messageIdBeforeUnload);
+
+ persistentTopic = (PersistentTopic) getTopic(topicName,
true).get().get();
managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
// now we have two ledgers, the first is expired but is contains the
lastMessageId
// the second is empty and should be kept as it is the current tail
@@ -181,7 +170,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase
{
// lastMessageId should be available even in this case, but is must
// refer to -1
- MessageId messageIdAfterTrim =
pulsar.getAdminClient().topics().getLastMessageId(topicName);
+ MessageId messageIdAfterTrim =
admin.topics().getLastMessageId(topicName);
LOG.info("lastmessageid " + messageIdAfterTrim);
assertEquals(messageIdAfterTrim, MessageId.earliest);
@@ -189,31 +178,29 @@ public class ConsumedLedgersTrimTest extends
BrokerTestBase {
@Test
public void testAdminTrimLedgers() throws Exception {
- conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE / 2);
- conf.setDefaultNumberOfNamespaceBundles(1);
- super.baseSetup();
- final String topicName =
"persistent://prop/ns-abc/TestAdminTrimLedgers" + UUID.randomUUID();
final String subscriptionName = "my-sub";
final int maxEntriesPerLedger = 2;
final int partitionedNum = 3;
- admin.topics().createPartitionedTopic(topicName, partitionedNum);
+ String partitionedTopic = "persistent://" + getNamespace()
+ + "/trim-ledgers-" + UUID.randomUUID().toString().substring(0,
8);
+ admin.topics().createPartitionedTopic(partitionedTopic,
partitionedNum);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topicName)
+ .topic(partitionedTopic)
.enableBatching(false)
.producerName("producer-name")
.create();
@Cleanup
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
- .subscribe();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(partitionedTopic)
+ .subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < partitionedNum; i++) {
- String topic = TopicName.get(topicName).getPartition(i).toString();
- Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
+ String topic =
TopicName.get(partitionedTopic).getPartition(i).toString();
+ Topic topicRef = getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
}
- PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
-
.getTopicReference(TopicName.get(topicName).getPartition(0).toString()).get();
+ PersistentTopic persistentTopic = (PersistentTopic) getTopicReference(
+
TopicName.get(partitionedTopic).getPartition(0).toString()).get();
ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionSizeInMB(-1);
managedLedgerConfig.setRetentionTime(1, TimeUnit.MILLISECONDS);
@@ -231,7 +218,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase
{
consumer.acknowledge(msg);
}
//consumed ledger should be cleaned
- admin.topics().trimTopic(topicName);
+ admin.topics().trimTopic(partitionedTopic);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1));
@@ -239,8 +226,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase
{
@Test
public void trimNonPersistentTopic() throws Exception {
- super.baseSetup();
- String topicName =
"non-persistent://prop/ns-abc/trimNonPersistentTopic" + UUID.randomUUID();
+ String topicName = "non-persistent://" + getNamespace() +
"/trimNonPersistentTopic" + UUID.randomUUID();
int partitionedNum = 3;
admin.topics().createPartitionedTopic(topicName, partitionedNum);
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 4f83d25a292..226456df597 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -30,28 +30,14 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
+public class CurrentLedgerRolloverIfFullTest extends SharedPulsarBaseTest {
@Test
public void testCurrentLedgerRolloverIfFull() throws Exception {
- final String topicName =
"persistent://prop/ns-abc/CurrentLedgerRolloverIfFullTest";
+ final String topicName = newTopicName();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
@@ -65,9 +51,9 @@ public class CurrentLedgerRolloverIfFullTest extends
BrokerTestBase {
.subscriptionName("CurrentLedgerRolloverIfFullTest-subscriber-name")
.subscribe();
- Topic topicRef =
pulsar.getBrokerService().getTopicReference(topicName).get();
+ Topic topicRef = getTopicReference(topicName).get();
Assert.assertNotNull(topicRef);
- PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+ PersistentTopic persistentTopic = (PersistentTopic)
getTopic(topicName, true).get().get();
ManagedLedgerConfig managedLedgerConfig =
persistentTopic.getManagedLedger().getConfig();
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index c108d5b8ca1..1acb7917ac6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.util.HashedWheelTimer;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -39,25 +40,11 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ExclusiveProducerTest extends BrokerTestBase {
-
- @BeforeClass
- protected void setup() throws Exception {
- // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
- isTcpLookup = true;
- baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- protected void cleanup() throws Exception {
- internalCleanup();
- }
+public class ExclusiveProducerTest extends SharedPulsarBaseTest {
@DataProvider(name = "topics")
public static Object[][] topics() {
@@ -150,7 +137,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
@Cleanup
PulsarClient pulsarClient2 = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
+ .serviceUrl(getBrokerServiceUrl())
.operationTimeout(2, TimeUnit.SECONDS)
.build();
@@ -222,13 +209,21 @@ public class ExclusiveProducerTest extends BrokerTestBase
{
@Test(dataProvider = "topics")
public void testProducerTasksCleanupWhenUsingExclusiveProducers(String
type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);
- Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+
+ // Use a dedicated client so we can assert zero pending timeouts
without interference
+ // from operations on the shared client.
+ @Cleanup
+ PulsarClient dedicatedClient = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .build();
+
+ Producer<String> p1 = dedicatedClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.Exclusive)
.create();
try {
- pulsarClient.newProducer(Schema.STRING)
+ dedicatedClient.newProducer(Schema.STRING)
.topic(topic)
.accessMode(ProducerAccessMode.Exclusive)
.create();
@@ -239,7 +234,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
p1.close();
- HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
dedicatedClient).timer();
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(timer.pendingTimeouts(), 0));
}
@@ -322,13 +317,13 @@ public class ExclusiveProducerTest extends BrokerTestBase
{
// Simulate a producer that takes over and fences p1 through the topic
epoch
if (!partitioned) {
- Topic t = pulsar.getBrokerService().getTopic(topic,
false).get().get();
+ Topic t = getTopic(topic, false).get().get();
CompletableFuture<?> f = ((AbstractTopic)
t).incrementTopicEpoch(Optional.of(0L));
f.get();
} else {
for (int i = 0; i < 3; i++) {
String name = TopicName.get(topic).getPartition(i).toString();
- Topic t = pulsar.getBrokerService().getTopic(name,
false).get().get();
+ Topic t = getTopic(name, false).get().get();
CompletableFuture<?> f = ((AbstractTopic)
t).incrementTopicEpoch(Optional.of(0L));
f.get();
}
@@ -423,7 +418,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
@Cleanup
PulsarClient client = PulsarClient.builder()
- .serviceUrl(pulsar.getBrokerServiceUrl())
+ .serviceUrl(getBrokerServiceUrl())
.operationTimeout(1, TimeUnit.SECONDS)
.build();
@@ -463,7 +458,8 @@ public class ExclusiveProducerTest extends BrokerTestBase {
}
private String newTopic(String type, boolean isPartitioned) throws
Exception {
- String topic = type + "://" + newTopicName();
+ String topicSuffix = UUID.randomUUID().toString().substring(0, 8);
+ String topic = type + "://" + getNamespace() + "/topic-" + topicSuffix;
if (isPartitioned) {
admin.topics().createPartitionedTopic(topic, 3);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
index fe18e810a4e..c440bf9d162 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
@@ -34,32 +34,18 @@ import
org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
- * Null value message produce and consume test.
+ * KeyValue schema produce and consume test.
*/
@Slf4j
@Test(groups = "broker")
-public class KeyValueTest extends BrokerTestBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class KeyValueTest extends SharedPulsarBaseTest {
@Test
- public void keyValueAutoConsumeTest() throws Exception {
- String topic = "persistent://prop/ns-abc/kv-record";
+ public void keyValueAutoConsumeTest() throws Exception {
+ String topic = newTopicName();
admin.topics().createNonPartitionedTopic(topic);
RecordSchemaBuilder builder = SchemaBuilder
@@ -86,11 +72,8 @@ public class KeyValueTest extends BrokerTestBase {
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
-
Message<KeyValue<GenericRecord, GenericRecord>> message =
consumer.receive();
- assertEquals(key.getField("test"),
message.getValue().getKey().getField("test"));
- assertEquals(value.getField("test"),
message.getValue().getValue().getField("test"));
-
+ assertEquals(message.getValue().getKey().getField("test"),
key.getField("test"));
+ assertEquals(message.getValue().getValue().getField("test"),
value.getField("test"));
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
index 80aa5f198cc..2114aa8a5de 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PartitionKeyTest.java
@@ -20,39 +20,27 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class PartitionKeyTest extends BrokerTestBase {
-
- @BeforeMethod
- @Override
- public void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class PartitionKeyTest extends SharedPulsarBaseTest {
@Test(timeOut = 10000)
public void testPartitionKey() throws Exception {
- final String topicName = "persistent://prop/ns-abc/testPartitionKey";
+ final String topicName = newTopicName();
+ @Cleanup
org.apache.pulsar.client.api.Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscription").subscribe();
// 1. producer with batch enabled
+ @Cleanup
Producer<byte[]> producerWithBatches =
pulsarClient.newProducer().topic(topicName).enableBatching(true)
.create();
-
// 2. Producer without batches
Producer<byte[]> producerWithoutBatches =
pulsarClient.newProducer().topic(topicName).create();
@@ -70,7 +58,5 @@ public class PartitionKeyTest extends BrokerTestBase {
consumer.acknowledge(msg);
}
-
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index 2b6ff1575e0..e4d3863ecdb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -40,33 +41,18 @@ import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ResendRequestTest extends BrokerTestBase {
+public class ResendRequestTest extends SharedPulsarBaseTest {
private static final long testTimeout = 60000; // 1 min
private static final Logger log =
LoggerFactory.getLogger(ResendRequestTest.class);
- @BeforeMethod
- @Override
- public void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@Test(timeOut = testTimeout)
public void testExclusiveSingleAckedNormalTopic() throws Exception {
- String key = "testExclusiveSingleAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
HashSet<MessageId> messageIdHashSet = new HashSet<>();
@@ -78,7 +64,7 @@ public class ResendRequestTest extends BrokerTestBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topicRef = (PersistentTopic)
getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
@@ -156,17 +142,16 @@ public class ResendRequestTest extends BrokerTestBase {
@Test(timeOut = testTimeout)
public void testSharedSingleAckedNormalTopic() throws Exception {
- String key = "testSharedSingleAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-shared-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-shared-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topicRef = (PersistentTopic)
getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
@@ -175,7 +160,8 @@ public class ResendRequestTest extends BrokerTestBase {
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
.subscriptionType(SubscriptionType.Shared).subscribe();
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> consumer2 =
newPulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(totalMessages / 2)
.subscriptionType(SubscriptionType.Shared).subscribe();
@@ -235,7 +221,6 @@ public class ResendRequestTest extends BrokerTestBase {
message2 = consumer2.receive(200, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);
log.info("Additional received = " + receivedMessagesAfterRedelivery);
- newPulsarClient.close();
assertTrue(receivedMessagesAfterRedelivery > 0);
assertEquals(receivedConsumer1 + receivedConsumer2, totalMessages);
@@ -243,17 +228,16 @@ public class ResendRequestTest extends BrokerTestBase {
@Test(timeOut = testTimeout)
public void testFailoverSingleAckedNormalTopic() throws Exception {
- String key = "testFailoverSingleAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-failover-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-failover-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topicRef = (PersistentTopic)
getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
@@ -361,10 +345,9 @@ public class ResendRequestTest extends BrokerTestBase {
@Test(timeOut = testTimeout)
public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
- String key = "testExclusiveCumulativeAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
@@ -373,7 +356,7 @@ public class ResendRequestTest extends BrokerTestBase {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topicRef = (PersistentTopic)
getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
@@ -418,10 +401,9 @@ public class ResendRequestTest extends BrokerTestBase {
@Test(timeOut = testTimeout)
public void testExclusiveSingleAckedPartitionedTopic() throws Exception {
- String key = "testExclusiveSingleAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
final int numberOfPartitions = 4;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -471,10 +453,9 @@ public class ResendRequestTest extends BrokerTestBase {
@Test(timeOut = testTimeout)
public void testSharedSingleAckedPartitionedTopic() throws Exception {
- String key = "testSharedSingleAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-shared-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-shared-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
final int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -489,7 +470,8 @@ public class ResendRequestTest extends BrokerTestBase {
Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
- PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0); // Creates new client connection
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient();
Consumer<byte[]> consumer2 =
newPulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(7).subscriptionType(SubscriptionType.Shared).subscribe();
@@ -529,14 +511,14 @@ public class ResendRequestTest extends BrokerTestBase {
message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info("messageCount1 = " + messageCount1);
+ log.info("messageCount2 = " + messageCount2);
+ log.info("ackCount1 = " + ackCount1);
+ log.info("ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2, totalMessages);
// 5. Ask for redeliver
- log.info(key + ": Sent a Redeliver Message Request");
+ log.info("Sent a Redeliver Message Request");
consumer1.redeliverUnacknowledgedMessages();
if ((ackCount1 + ackCount2) == totalMessages) {
return;
@@ -559,20 +541,18 @@ public class ResendRequestTest extends BrokerTestBase {
message2 = consumer2.receive(1000, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
- newPulsarClient.close();
+ log.info("messageCount1 = " + messageCount1);
+ log.info("messageCount2 = " + messageCount2);
+ log.info("ackCount1 = " + ackCount1);
+ log.info("ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2 + ackCount1, totalMessages);
}
@Test(timeOut = testTimeout)
public void testFailoverSingleAckedPartitionedTopic() throws Exception {
- String key = "testFailoverSingleAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-failover-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-failover-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
final int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -625,16 +605,16 @@ public class ResendRequestTest extends BrokerTestBase {
message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info("messageCount1 = " + messageCount1);
+ log.info("messageCount2 = " + messageCount2);
+ log.info("ackCount1 = " + ackCount1);
+ log.info("ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2, totalMessages);
if ((ackCount1 + ackCount2) == totalMessages) {
return;
}
// 5. Ask for redeliver
- log.info(key + ": Sent a Redeliver Message Request");
+ log.info("Sent a Redeliver Message Request");
consumer1.redeliverUnacknowledgedMessages();
consumer1.close();
@@ -648,26 +628,25 @@ public class ResendRequestTest extends BrokerTestBase {
}
message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
} while (message2 != null);
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info("messageCount1 = " + messageCount1);
+ log.info("messageCount2 = " + messageCount2);
+ log.info("ackCount1 = " + ackCount1);
+ log.info("ackCount2 = " + ackCount2);
assertEquals(messageCount2 + ackCount1, totalMessages);
}
@Test(timeOut = testTimeout)
public void testFailoverInactiveConsumer() throws Exception {
- String key = "testFailoverInactiveConsumer";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-failover-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-failover-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topicRef = (PersistentTopic)
getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);
@@ -737,7 +716,7 @@ public class ResendRequestTest extends BrokerTestBase {
Field field = ConsumerBase.class.getDeclaredField("incomingMessages");
field.setAccessible(true);
imq = (GrowableArrayBlockingQueue<Message<byte[]>>) field.get(c);
- log.info("Incoming MEssage Queue: {}", imq.toList());
+ log.info("Incoming Message Queue: {}", imq.toList());
return imq;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
index dc04e513cca..17d2fdbfaf2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -50,13 +52,13 @@ public abstract class SharedPulsarBaseTest {
protected PulsarAdmin admin;
protected PulsarClient pulsarClient;
- private String namespace;
+ private final List<String> namespaces = new ArrayList<>();
/**
* Returns the unique namespace assigned to the current test method.
*/
protected String getNamespace() {
- return namespace;
+ return namespaces.get(0);
}
/**
@@ -133,21 +135,15 @@ public abstract class SharedPulsarBaseTest {
*/
@BeforeMethod(alwaysRun = true)
public void setupSharedTest() throws Exception {
- String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
- String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
- namespace = ns;
- admin.namespaces().createNamespace(ns,
Set.of(SharedPulsarCluster.CLUSTER_NAME));
- log.info("Created test namespace: {}", ns);
+ createNewNamespace();
}
/**
- * Force-deletes the namespace created by {@link #setupSharedTest()},
including all topics in it.
+ * Force-deletes all namespaces created during the test method.
*/
@AfterMethod(alwaysRun = true)
public void cleanupSharedTest() throws Exception {
- String ns = namespace;
- if (ns != null) {
- namespace = null;
+ for (String ns : namespaces) {
try {
admin.namespaces().deleteNamespace(ns, true);
log.info("Deleted test namespace: {}", ns);
@@ -155,12 +151,25 @@ public abstract class SharedPulsarBaseTest {
log.warn("Failed to delete namespace {}: {}", ns,
e.getMessage());
}
}
+ namespaces.clear();
+ }
+
+ /**
+ * Creates a new namespace under the shared tenant and registers it for
automatic cleanup.
+ */
+ protected String createNewNamespace() throws Exception {
+ String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+ String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
+ admin.namespaces().createNamespace(ns,
Set.of(SharedPulsarCluster.CLUSTER_NAME));
+ namespaces.add(ns);
+ log.info("Created test namespace: {}", ns);
+ return ns;
}
/**
* Generates a unique persistent topic name within the current test
namespace.
*/
protected String newTopicName() {
- return "persistent://" + namespace + "/topic-" +
UUID.randomUUID().toString().substring(0, 8);
+ return "persistent://" + getNamespace() + "/topic-" +
UUID.randomUUID().toString().substring(0, 8);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index f12ba8b98b1..41ff4eb20cd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -148,13 +148,13 @@ public class SharedPulsarCluster {
// Reduce thread pool sizes for faster startup (fewer threads to
create)
config.setNumIOThreads(2);
- config.setNumOrderedExecutorThreads(1);
+ config.setNumOrderedExecutorThreads(2);
config.setNumHttpServerThreads(4);
- config.setBookkeeperClientNumWorkerThreads(1);
+ config.setBookkeeperClientNumWorkerThreads(2);
config.setBookkeeperClientNumIoThreads(2);
config.setNumCacheExecutorThreadPoolSize(1);
- config.setManagedLedgerNumSchedulerThreads(1);
- config.setTopicOrderedExecutorThreadNum(2);
+ config.setManagedLedgerNumSchedulerThreads(2);
+ config.setTopicOrderedExecutorThreadNum(4);
// Disable the load balancer — single-broker cluster doesn't need it
config.setLoadBalancerEnabled(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 40379420637..a751f1ad743 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -46,31 +47,14 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class TopicTerminationTest extends BrokerTestBase {
-
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
- isTcpLookup = true;
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- private final String topicName = "persistent://prop/ns-abc/topic0";
+public class TopicTerminationTest extends SharedPulsarBaseTest {
@Test
public void testSimpleTermination() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -93,6 +77,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(groups = "broker")
public void testCreateProducerOnTerminatedTopic() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -114,6 +99,7 @@ public class TopicTerminationTest extends BrokerTestBase {
}
public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws
Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -139,6 +125,8 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testTerminateWhilePublishing() throws Exception {
+ String topicName = newTopicName();
+ @Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -188,6 +176,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(groups = "broker")
public void testDoubleTerminate() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -207,6 +196,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(groups = "broker")
public void testTerminatePartitionedTopic() throws Exception {
+ String topicName = newTopicName();
admin.topics().createPartitionedTopic(topicName, 4);
try {
@@ -219,11 +209,12 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSimpleTerminationConsumer() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- org.apache.pulsar.client.api.Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").subscribe();
MessageId msgId1 = producer.send("test-msg-1".getBytes());
@@ -256,6 +247,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSimpleTerminationMessageListener() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -263,7 +255,7 @@ public class TopicTerminationTest extends BrokerTestBase {
CountDownLatch latch = new CountDownLatch(1);
- org.apache.pulsar.client.api.Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").messageListener(new
MessageListener<byte[]>() {
@Override
@@ -295,6 +287,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSimpleTerminationReader() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -326,6 +319,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSimpleTerminationReaderListener() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -362,6 +356,7 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSubscribeOnTerminatedTopic() throws Exception {
+ String topicName = newTopicName();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -372,7 +367,7 @@ public class TopicTerminationTest extends BrokerTestBase {
MessageId lastMessageId =
admin.topics().terminateTopicAsync(topicName).get();
assertEquals(lastMessageId, msgId2);
- org.apache.pulsar.client.api.Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").subscribe();
Awaitility.await().untilAsserted(() ->
assertTrue(consumer.hasReachedEndOfTopic()));
@@ -380,13 +375,14 @@ public class TopicTerminationTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testSubscribeOnTerminatedTopicWithNoMessages() throws
Exception {
+ String topicName = newTopicName();
pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
admin.topics().terminateTopicAsync(topicName).get();
- org.apache.pulsar.client.api.Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName)
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").subscribe();
Awaitility.await().untilAsserted(() ->
assertTrue(consumer.hasReachedEndOfTopic()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
index c110512b854..3a313f7b2a6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java
@@ -26,37 +26,23 @@ import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.common.protocol.Commands;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker")
-public class ChecksumTest extends BrokerTestBase {
-
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
+public class ChecksumTest extends SharedPulsarBaseTest {
@Test
public void verifyChecksumStoredInManagedLedger() throws Exception {
- final String topicName = "persistent://prop/ns-abc/topic0";
+ final String topicName = newTopicName();
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName).get();
ManagedLedger ledger = topic.getManagedLedger();
ManagedCursor cursor = ledger.openCursor("test");
@@ -79,7 +65,7 @@ public class ChecksumTest extends BrokerTestBase {
@Test
public void verifyChecksumSentToConsumer() throws Exception {
- final String topicName = "persistent://prop/ns-abc/topic-1";
+ final String topicName = newTopicName();
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
RawReader reader = RawReader.create(pulsarClient, topicName,
"sub").get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
index deb64287406..9642d47585f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.fail;
import java.util.List;
import lombok.Cleanup;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -30,60 +30,49 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test
-public class PartitionKeywordCompatibilityTest extends BrokerTestBase {
-
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- baseSetup();
- setupDefaultTenantAndNamespace();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
+public class PartitionKeywordCompatibilityTest extends SharedPulsarBaseTest {
public void testAutoCreatePartitionTopicWithKeywordAndDeleteIt()
throws PulsarAdminException, PulsarClientException {
+ String namespace = getNamespace();
AutoTopicCreationOverride override =
AutoTopicCreationOverride.builder()
.allowAutoTopicCreation(true)
.topicType("partitioned")
.defaultNumPartitions(1)
.build();
- admin.namespaces().setAutoTopicCreation("public/default", override);
- String topicName = "persistent://public/default/XXX-partition-0-dd";
+ admin.namespaces().setAutoTopicCreation(namespace, override);
+ String topicName = "persistent://" + namespace + "/XXX-partition-0-dd";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
- List<String> topics = admin.topics().getList("public/default");
- List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList("public/default");
+ List<String> topics = admin.topics().getList(namespace);
+ List<String> partitionedTopicList =
admin.topics().getPartitionedTopicList(namespace);
Assert.assertTrue(topics.contains(TopicName.get(topicName).getPartition(0).toString()));
Assert.assertTrue(partitionedTopicList.contains(topicName));
consumer.close();
PartitionedTopicStats stats =
admin.topics().getPartitionedStats(topicName, false);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
admin.topics().deletePartitionedTopic(topicName);
- topics = admin.topics().getList("public/default");
- partitionedTopicList =
admin.topics().getPartitionedTopicList("public/default");
+ topics = admin.topics().getList(namespace);
+ partitionedTopicList =
admin.topics().getPartitionedTopicList(namespace);
Assert.assertFalse(topics.contains(topicName));
Assert.assertFalse(partitionedTopicList.contains(topicName));
}
@Test
public void testDeletePartitionedTopicValidation() throws
PulsarAdminException {
- final String topicName =
"persistent://public/default/testDeletePartitionedTopicValidation";
- final String partitionKeywordTopic =
"persistent://public/default/testDelete-partition-edTopicValidation";
- final String partitionedTopic =
"persistent://public/default/testDeletePartitionedTopicValidation-partition-0";
+ String namespace = getNamespace();
+ final String topicName = "persistent://" + namespace +
"/testDeletePartitionedTopicValidation";
+ final String partitionKeywordTopic = "persistent://" + namespace
+ + "/testDelete-partition-edTopicValidation";
+ final String partitionedTopic = "persistent://" + namespace
+ + "/testDeletePartitionedTopicValidation-partition-0";
try {
admin.topics().deletePartitionedTopic(topicName);
fail("expect not found!");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
index 840454b4e21..0ef638b4b21 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java
@@ -30,7 +30,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -41,54 +41,36 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
-public class ShadowTopicTest extends BrokerTestBase {
-
- @BeforeClass(alwaysRun = true)
- @Override
- protected void setup() throws Exception {
- baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
-
- private String newShadowSourceTopicName() {
- return "persistent://" + newTopicName();
- }
+public class ShadowTopicTest extends SharedPulsarBaseTest {
@Test
public void testNonPartitionedShadowTopicSetup() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
//1. test shadow topic setting in topic creation.
admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
PersistentTopic brokerShadowTopic =
- (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+ (PersistentTopic) getTopicIfExists(shadowTopic).get().get();
Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof
ShadowManagedLedgerImpl);
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(),
sourceTopic);
Assert.assertEquals(admin.topics().getShadowSource(shadowTopic),
sourceTopic);
//2. test shadow topic could be properly loaded after unload.
- admin.namespaces().unload("prop/ns-abc");
-
Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+ admin.namespaces().unload(getNamespace());
+ Assert.assertTrue(getTopicReference(shadowTopic).isEmpty());
Assert.assertEquals(admin.topics().getShadowSource(shadowTopic),
sourceTopic);
- brokerShadowTopic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(shadowTopic).get().get();
+ brokerShadowTopic = (PersistentTopic)
getTopicIfExists(shadowTopic).get().get();
Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof
ShadowManagedLedgerImpl);
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(),
sourceTopic);
}
@Test
public void testPartitionedShadowTopicSetup() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
String sourceTopicPartition =
TopicName.get(sourceTopic).getPartition(0).toString();
String shadowTopicPartition =
TopicName.get(shadowTopic).getPartition(0).toString();
@@ -98,26 +80,25 @@ public class ShadowTopicTest extends BrokerTestBase {
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
pulsarClient.newProducer().topic(shadowTopic).create().close();
//trigger loading partitions.
- PersistentTopic brokerShadowTopic = (PersistentTopic)
pulsar.getBrokerService()
- .getTopicIfExists(shadowTopicPartition).get().get();
+ PersistentTopic brokerShadowTopic = (PersistentTopic)
getTopicIfExists(shadowTopicPartition).get().get();
Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof
ShadowManagedLedgerImpl);
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(),
sourceTopicPartition);
Assert.assertEquals(admin.topics().getShadowSource(shadowTopic),
sourceTopic);
//2. test shadow topic could be properly loaded after unload.
- admin.namespaces().unload("prop/ns-abc");
-
Assert.assertTrue(pulsar.getBrokerService().getTopicReference(shadowTopic).isEmpty());
+ admin.namespaces().unload(getNamespace());
+ Assert.assertTrue(getTopicReference(shadowTopic).isEmpty());
Assert.assertEquals(admin.topics().getShadowSource(shadowTopic),
sourceTopic);
brokerShadowTopic =
- (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get();
+ (PersistentTopic)
getTopicIfExists(shadowTopicPartition).get().get();
Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof
ShadowManagedLedgerImpl);
Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(),
sourceTopicPartition);
}
@Test
public void testPartitionedShadowTopicProduceAndConsume() throws Exception
{
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createPartitionedTopic(sourceTopic, 3);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -146,7 +127,7 @@ public class ShadowTopicTest extends BrokerTestBase {
@Test
public void testShadowTopicNotWritable() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -156,18 +137,19 @@ public class ShadowTopicTest extends BrokerTestBase {
}
private void awaitUntilShadowReplicatorReady(String sourceTopic, String
shadowTopic) {
- Awaitility.await().untilAsserted(()->{
+ Awaitility.await().untilAsserted(() -> {
PersistentTopic sourcePersistentTopic =
- (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(sourceTopic).get().get();
+ (PersistentTopic)
getTopicIfExists(sourceTopic).get().get();
ShadowReplicator
replicator = (ShadowReplicator)
sourcePersistentTopic.getShadowReplicators().get(shadowTopic);
Assert.assertNotNull(replicator);
Assert.assertEquals(String.valueOf(replicator.getState()),
"Started");
});
}
+
@Test
public void testShadowTopicConsuming() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -188,7 +170,7 @@ public class ShadowTopicTest extends BrokerTestBase {
@Test
public void testShadowTopicConsumingWithStringSchema() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -220,9 +202,10 @@ public class ShadowTopicTest extends BrokerTestBase {
int x;
int y;
}
+
@Test
public void testShadowTopicConsumingWithJsonSchema() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
admin.topics().createShadowTopic(shadowTopic, sourceTopic);
@@ -244,7 +227,7 @@ public class ShadowTopicTest extends BrokerTestBase {
@Test
public void testConsumeShadowMessageWithoutCache() throws Exception {
- String sourceTopic = newShadowSourceTopicName();
+ String sourceTopic = newTopicName();
String shadowTopic = sourceTopic + "-shadow";
admin.topics().createNonPartitionedTopic(sourceTopic);
@Cleanup Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
index c475e8aa61c..61e09664ea1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java
@@ -26,16 +26,14 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.client.api.ClientBuilder;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
@@ -45,40 +43,14 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
-import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class MessageChecksumTest extends BrokerTestBase {
+public class MessageChecksumTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(MessageChecksumTest.class);
- @BeforeMethod
- @Override
- public void setup() throws Exception {
- baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
-
- @Override
- protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
- // disable connection pooling
- clientBuilder.connectionsPerBroker(0);
- }
-
- @Override
- protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
- return PulsarTestClient.create(clientBuilder);
- }
-
// Enum parameter used to describe the 2 different scenarios in the
// testChecksumCompatibilityInMixedVersionBrokerCluster test case
enum MixedVersionScenario {
@@ -114,31 +86,31 @@ public class MessageChecksumTest extends BrokerTestBase {
@Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider =
"values")
public void
testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario
mixedVersionScenario)
throws Exception {
- // GIVEN
- final String topicName =
-
"persistent://prop/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
+ final String topicName = newTopicName();
+
+ // Create a PulsarTestClient with connection pooling disabled
+ @Cleanup
+ PulsarTestClient pulsarTestClient = (PulsarTestClient)
PulsarTestClient.create(
+ PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .connectionsPerBroker(0));
if (mixedVersionScenario ==
MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
// Given, the client thinks it's connected to a broker that
doesn't support message checksums
- makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+
pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
}
- PulsarTestClient pulsarTestClient = (PulsarTestClient) pulsarClient;
-
- ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarTestClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ Consumer<byte[]> consumer = pulsarTestClient.newConsumer()
.topic(topicName)
.subscriptionName("my-sub")
.subscribe();
- // inject a CountDownLatch to the pending message callback of the
PulsarTestClient
- CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
-
// WHEN
// a message is sent, it should succeed
producer.send("message-1".getBytes());
@@ -169,10 +141,10 @@ public class MessageChecksumTest extends BrokerTestBase {
if (mixedVersionScenario ==
MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
// Given, the client thinks it's connected to a broker that
doesn't support message checksums
- makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
+
pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
} else {
// Reset the overriding set in the beginning
- resetOverridingConnectedBrokerVersion();
+ pulsarTestClient.setOverrideRemoteEndpointProtocolVersion(0);
}
// And
@@ -199,28 +171,11 @@ public class MessageChecksumTest extends BrokerTestBase {
assertEquals(new String(msg.getData()), "message-3");
}
- private void
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
- // make the client think that the connected broker is of version which
doesn't support checksum validation
- ((PulsarTestClient)
pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
- }
-
- private void resetOverridingConnectedBrokerVersion() {
- // reset the override and use the actual protocol version
- ((PulsarTestClient)
pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
- }
-
- private void
waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
- // wait until the message is in the pending queue
- Awaitility.await().untilAsserted(() -> {
- assertEquals(producer.getPendingQueueSize(), 1);
- });
- }
-
@Test
public void testTamperingMessageIsDetected() throws Exception {
// GIVEN
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
-
.topic("persistent://prop/ns-abc/testTamperingMessageIsDetected")
+ .topic(newTopicName())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0883c329b21..4393e2f5ec2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -23,9 +23,8 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -37,29 +36,15 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase {
+public class UnAcknowledgedMessagesTimeoutTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(UnAcknowledgedMessagesTimeoutTest.class);
private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
- @Override
- @BeforeMethod
- public void setup() throws Exception {
- super.baseSetup();
- }
-
- @Override
- @AfterMethod(alwaysRun = true)
- public void cleanup() throws Exception {
- super.internalCleanup();
- }
-
@DataProvider(name = "variationsRedeliveryTracker")
public static Object[][] variationsRedeliveryTracker() {
return new Object[][]{
@@ -71,10 +56,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
@Test(dataProvider = "variationsRedeliveryTracker")
public void testExclusiveSingleAckedNormalTopic(boolean
isRedeliveryTracker) throws Exception {
- String key = "testExclusiveSingleAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
@@ -110,7 +94,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
long size = ((ConsumerImpl<?>)
consumer).getUnAckedMessageTracker().size();
- log.info(key + " Unacked Message Tracker size is " + size);
+ log.info(" Unacked Message Tracker size is " + size);
assertEquals(size, totalMessages / 2);
// Blocking call, redeliver should kick in
@@ -131,17 +115,16 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
} while (message != null);
size = ((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().size();
- log.info(key + " Unacked Message Tracker size is " + size);
+ log.info(" Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(hSet.size(), totalMessages);
}
@Test(dataProvider = "variationsRedeliveryTracker")
public void testExclusiveCumulativeAckedNormalTopic(boolean
isRedeliveryTracker) throws Exception {
- String key = "testExclusiveCumulativeAckedNormalTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
// 1. producer connect
@@ -194,10 +177,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
@Test(dataProvider = "variationsRedeliveryTracker")
public void testSharedSingleAckedPartitionedTopic(boolean
isRedeliveryTracker) throws Exception {
- String key = "testSharedSingleAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-shared-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-shared-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 20;
final int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -248,36 +230,36 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
int ackCount1 = 0;
int ackCount2 = messageCount2;
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info(" messageCount1 = " + messageCount1);
+ log.info(" messageCount2 = " + messageCount2);
+ log.info(" ackCount1 = " + ackCount1);
+ log.info(" ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2, totalMessages);
// 5. Check if Messages redelivered again
// Since receive is a blocking call hoping that timeout will kick in
Thread.sleep((int) (ackTimeOutMillis * 1.1));
- log.info(key + " Timeout should be triggered now");
+ log.info(" Timeout should be triggered now");
messageCount1 = receiveAllMessage(consumer1, true);
messageCount2 += receiveAllMessage(consumer2, false);
ackCount1 = messageCount1;
- log.info(key + " messageCount1 = " + messageCount1);
- log.info(key + " messageCount2 = " + messageCount2);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info(" messageCount1 = " + messageCount1);
+ log.info(" messageCount2 = " + messageCount2);
+ log.info(" ackCount1 = " + ackCount1);
+ log.info(" ackCount2 = " + ackCount2);
assertEquals(messageCount1 + messageCount2, totalMessages);
assertEquals(ackCount1 + messageCount2, totalMessages);
Thread.sleep((int) (ackTimeOutMillis * 2));
// Since receive is a blocking call hoping that timeout will kick in
- log.info(key + " Timeout should be triggered again");
+ log.info(" Timeout should be triggered again");
ackCount1 += receiveAllMessage(consumer1, true);
ackCount2 += receiveAllMessage(consumer2, true);
- log.info(key + " ackCount1 = " + ackCount1);
- log.info(key + " ackCount2 = " + ackCount2);
+ log.info(" ackCount1 = " + ackCount1);
+ log.info(" ackCount2 = " + ackCount2);
assertEquals(ackCount1 + ackCount2, totalMessages);
}
@@ -300,10 +282,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
@Test(dataProvider = "variationsRedeliveryTracker")
public void testFailoverSingleAckedPartitionedTopic(boolean
isRedeliveryTracker) throws Exception {
- String key = "testFailoverSingleAckedPartitionedTopic";
- final String topicName = "persistent://prop/ns-abc/topic-" + key +
UUID.randomUUID().toString();
- final String subscriptionName = "my-failover-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-failover-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 10;
final int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
@@ -397,7 +378,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
// 5. Check if Messages redelivered again
Thread.sleep(ackTimeOutMillis * 2);
- log.info(key + " Timeout should be triggered now");
+ log.info(" Timeout should be triggered now");
messagesReceived = 0;
while (true) {
Message<byte[]> message1 = consumer1.receive(500,
TimeUnit.MILLISECONDS);
@@ -439,10 +420,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
@Test
public void testCheckUnAcknowledgedMessageTimer() throws
PulsarClientException, InterruptedException {
- String key = "testCheckUnAcknowledgedMessageTimer";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 3;
// 1. producer connect
@@ -494,10 +474,9 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
public void testCheckUnAcknowledgedMessageRedeliveryTimer(long
ackTimeOutMillis, long minDelayMs,
long maxDelayMs,
int multiplier)
throws PulsarClientException, InterruptedException {
- String key = "testCheckUnAcknowledgedMessageRedeliveryTimer";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
final int totalMessages = 3;
// 1. producer connect
@@ -544,7 +523,7 @@ public class UnAcknowledgedMessagesTimeoutTest extends
BrokerTestBase {
@Test
public void testSingleMessageBatch() throws Exception {
- String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+ String topicName = newTopicName();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
index 8f10281bcd0..68a49427bb8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java
@@ -34,8 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -53,27 +52,13 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
-public class ZeroQueueSizeTest extends BrokerTestBase {
+public class ZeroQueueSizeTest extends SharedPulsarBaseTest {
private static final Logger log =
LoggerFactory.getLogger(ZeroQueueSizeTest.class);
private final int totalMessages = 10;
- @BeforeClass
- @Override
- public void setup() throws Exception {
- baseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- internalCleanup();
- }
-
@Test
public void validQueueSizeConfig() {
pulsarClient.newConsumer().receiverQueueSize(0);
@@ -86,9 +71,8 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(expectedExceptions =
PulsarClientException.InvalidConfigurationException.class)
public void zeroQueueSizeReceiveAsyncInCompatibility() throws
PulsarClientException {
- String key = "zeroQueueSizeReceiveAsyncInCompatibility";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(0).subscribe();
@@ -97,9 +81,8 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(expectedExceptions = PulsarClientException.class)
public void zeroQueueSizePartitionedTopicInCompatibility() throws
PulsarClientException, PulsarAdminException {
- String key = "zeroQueueSizePartitionedTopicInCompatibility";
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
int numberOfPartitions = 3;
admin.topics().createPartitionedTopic(topicName, numberOfPartitions);
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).receiverQueueSize(0).subscribe();
@@ -107,12 +90,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void zeroQueueSizeNormalConsumer() throws PulsarClientException {
- String key = "nonZeroQueueSizeNormalConsumer";
-
- // 1. Config
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -144,12 +124,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void zeroQueueSizeConsumerListener() throws Exception {
- String key = "zeroQueueSizeConsumerListener";
-
- // 1. Config
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -188,12 +165,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void zeroQueueSizeSharedSubscription() throws PulsarClientException
{
- String key = "zeroQueueSizeSharedSubscription";
-
- // 1. Config
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -229,12 +203,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void zeroQueueSizeFailoverSubscription() throws
PulsarClientException {
- String key = "zeroQueueSizeFailoverSubscription";
-
- // 1. Config
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
@@ -288,12 +259,13 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
public void testFailedZeroQueueSizeBatchMessage() throws
PulsarClientException {
int batchMessageDelayMs = 100;
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://prop/ns-abc/topic1")
+ String topicName = newTopicName();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
.subscribe();
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
- .topic("persistent://prop/ns-abc/topic1")
+ .topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition);
if (batchMessageDelayMs != 0) {
@@ -324,7 +296,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void testZeroQueueSizeMessageRedelivery() throws
PulsarClientException {
- final String topic =
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery";
+ final String topic = newTopicName();
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.receiverQueueSize(0)
@@ -356,7 +328,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void testZeroQueueSizeMessageRedeliveryForListener() throws
Exception {
- final String topic =
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener";
+ final String topic = newTopicName();
final int messages = 10;
final CountDownLatch latch = new CountDownLatch(messages * 2);
Set<Integer> receivedMessages = new HashSet<>();
@@ -394,7 +366,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test
public void testZeroQueueSizeMessageRedeliveryForAsyncReceive()
throws PulsarClientException, ExecutionException,
InterruptedException {
- final String topic =
"persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive";
+ final String topic = newTopicName();
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.receiverQueueSize(0)
@@ -431,8 +403,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testZeroQueueGetExceptionWhenReceiveBatchMessage() throws
PulsarClientException {
- final String topic = BrokerTestUtil.newUniqueName(
-
"persistent://prop/ns-abc/testZeroQueueGetExceptionWhenReceiveBatchMessage-");
+ final String topic = newTopicName();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).receiverQueueSize(0)
.subscribe();
@@ -465,7 +436,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testPauseAndResume() throws Exception {
- final String topicName =
"persistent://prop/ns-abc/zero-queue-pause-and-resume";
+ final String topicName = newTopicName();
final String subName = "sub";
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new
CountDownLatch(1));
@@ -501,7 +472,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testPauseAndResumeWithUnloading() throws Exception {
- final String topicName =
"persistent://prop/ns-abc/zero-queue-pause-and-resume-with-unloading";
+ final String topicName = newTopicName();
final String subName = "sub";
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new
CountDownLatch(1));
@@ -540,7 +511,7 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testPauseAndResumeNoReconnection() throws Exception {
- final String topicName =
"persistent://prop/ns-abc/zero-queue-pause-and-resume-no-reconnection";
+ final String topicName = newTopicName();
final String subName = "sub";
final Object object = new Object();
@@ -589,12 +560,9 @@ public class ZeroQueueSizeTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void
testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage() throws
Exception {
- String key = "payloadProcessorReceiveBatchMessage";
-
- // 1. Config
- final String topicName = "persistent://prop/ns-abc/topic-" + key;
- final String subscriptionName = "my-ex-subscription-" + key;
- final String messagePredicate = "my-message-" + key + "-";
+ final String topicName = newTopicName();
+ final String subscriptionName = "my-ex-subscription";
+ final String messagePredicate = "my-message-";
// 2. Create Producer
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)