This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aad1ce99ae [fix][test] Fix flaky 
ResendRequestTest.testFailoverSingleAckedNormalTopic (#25343)
2aad1ce99ae is described below

commit 2aad1ce99ae47b52f4bf1549a7bd06ff163c578a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Mar 18 12:10:36 2026 -0700

    [fix][test] Fix flaky ResendRequestTest.testFailoverSingleAckedNormalTopic 
(#25343)
---
 .../org/apache/pulsar/broker/service/ResendRequestTest.java   | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
index e4d3863ecdb..02caf35efa8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
@@ -248,6 +249,16 @@ public class ResendRequestTest extends 
SharedPulsarBaseTest {
         Consumer<byte[]> consumer1 = 
consumerBuilder.clone().consumerName("consumer-1").subscribe();
         Consumer<byte[]> consumer2 = 
consumerBuilder.clone().consumerName("consumer-2").subscribe();
 
+        // Wait for failover consumer assignment to settle so consumer-1 is 
the active consumer
+        Awaitility.await().untilAsserted(() -> {
+            Subscription sub = topicRef.getSubscription(subscriptionName);
+            assertNotNull(sub);
+            AbstractDispatcherSingleActiveConsumer dispatcher =
+                    (AbstractDispatcherSingleActiveConsumer) 
sub.getDispatcher();
+            assertEquals(dispatcher.getConsumers().size(), 2);
+            assertEquals(dispatcher.getActiveConsumer().consumerName(), 
"consumer-1");
+        });
+
         // 3. Producer publishes messages
         for (int i = 0; i < totalMessages; i++) {
             String message = messagePredicate + i;

Reply via email to