This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2aad1ce99ae [fix][test] Fix flaky
ResendRequestTest.testFailoverSingleAckedNormalTopic (#25343)
2aad1ce99ae is described below
commit 2aad1ce99ae47b52f4bf1549a7bd06ff163c578a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 12:10:36 2026 -0700
[fix][test] Fix flaky ResendRequestTest.testFailoverSingleAckedNormalTopic
(#25343)
---
.../org/apache/pulsar/broker/service/ResendRequestTest.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index e4d3863ecdb..02caf35efa8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
@@ -248,6 +249,16 @@ public class ResendRequestTest extends
SharedPulsarBaseTest {
Consumer<byte[]> consumer1 =
consumerBuilder.clone().consumerName("consumer-1").subscribe();
Consumer<byte[]> consumer2 =
consumerBuilder.clone().consumerName("consumer-2").subscribe();
+ // Wait for failover consumer assignment to settle so consumer-1 is
the active consumer
+ Awaitility.await().untilAsserted(() -> {
+ Subscription sub = topicRef.getSubscription(subscriptionName);
+ assertNotNull(sub);
+ AbstractDispatcherSingleActiveConsumer dispatcher =
+ (AbstractDispatcherSingleActiveConsumer)
sub.getDispatcher();
+ assertEquals(dispatcher.getConsumers().size(), 2);
+ assertEquals(dispatcher.getActiveConsumer().consumerName(),
"consumer-1");
+ });
+
// 3. Producer publishes messages
for (int i = 0; i < totalMessages; i++) {
String message = messagePredicate + i;