This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a2b69ccbc62 [fix][broker]Dispatcher did unnecessary sort for
recentlyJoinedConsumers and printed noisy error logs (#24634)
a2b69ccbc62 is described below
commit a2b69ccbc626192c544e07276bbfaa4195e0b6b5
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 9 19:17:02 2025 +0800
[fix][broker]Dispatcher did unnecessary sort for recentlyJoinedConsumers
and printed noisy error logs (#24634)
---
...tickyKeyDispatcherMultipleConsumersClassic.java | 41 ++++++++++----
...yKeyDispatcherMultipleConsumersClassicTest.java | 64 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index 56161d8dd15..c3b246fe9ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -58,6 +58,7 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
+import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,10 +96,25 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassic
PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic
topic, ManagedCursor cursor,
Subscription
subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
+ this(topic, cursor, subscription, conf, ksm, null);
+ }
+
+ /**
+ * @param recentlyJoinedConsumers This parameter is only used for testing.
+ */
+ @VisibleForTesting
+ PersistentStickyKeyDispatcherMultipleConsumersClassic(PersistentTopic
topic, ManagedCursor cursor,
+ Subscription subscription,
ServiceConfiguration conf,
+ KeySharedMeta ksm,
+ @Nullable
LinkedHashMap<Consumer, Position> recentlyJoinedConsumers) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
- this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
+ if (recentlyJoinedConsumers == null) {
+ this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null :
new LinkedHashMap<>();
+ } else {
+ this.recentlyJoinedConsumers = recentlyJoinedConsumers;
+ }
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
@@ -166,6 +182,10 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassic
});
}
+ /**
+ * Sort items in the collection "recentlyJoinedConsumers" if needed.
+ * Since we check the order of queue after each consumer joined, we can
only check the last two items.
+ */
private void sortRecentlyJoinedConsumersIfNeeded() {
if (!sortRecentlyJoinedConsumersIfNeeded) {
return;
@@ -173,20 +193,21 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassic
if (recentlyJoinedConsumers.size() == 1) {
return;
}
- // Since we check the order of queue after each consumer joined, we
can only check the last two items.
boolean sortNeeded = false;
- Position posPre = null;
- Position posAfter = null;
+ Position secondLatest = null;
+ Position latest = null;
for (Map.Entry<Consumer, Position> entry :
recentlyJoinedConsumers.entrySet()) {
- if (posPre == null) {
- posPre = entry.getValue();
+ if (secondLatest == null) {
+ secondLatest = entry.getValue();
+ } else if (latest == null) {
+ latest = entry.getValue();
} else {
- posPre = posAfter;
- posAfter = entry.getValue();
+ secondLatest = latest;
+ latest = entry.getValue();
}
}
- if (posPre != null && posAfter != null) {
- if (posPre.compareTo(posAfter) > 0) {
+ if (secondLatest != null && latest != null) {
+ if (secondLatest.compareTo(latest) > 0) {
sortNeeded = true;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
index fd87ead2017..1ef74915418 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassicTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,6 +47,7 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -299,6 +301,68 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassicTest {
persistentDispatcher.close();
}
+ @Test
+ public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws
Exception {
+ // Inject a sorting counter.
+ LinkedHashMap<Consumer, Position> recentlyJoinedConsumers = new
LinkedHashMap<>();
+ LinkedHashMap<Consumer, Position> spyRecentlyJoinedConsumers =
spy(recentlyJoinedConsumers);
+ AtomicInteger sortTimes = new AtomicInteger(0);
+ doAnswer(invocationOnMock -> {
+ sortTimes.incrementAndGet();
+ return invocationOnMock.callRealMethod();
+ }).when(spyRecentlyJoinedConsumers).clear();
+
+ PersistentStickyKeyDispatcherMultipleConsumersClassic
persistentDispatcher =
+ new PersistentStickyKeyDispatcherMultipleConsumersClassic(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT),
spyRecentlyJoinedConsumers);
+
+ Consumer consumer0 = createMockConsumer();
+ when(consumer0.consumerName()).thenReturn("0");
+ Consumer consumer1 = createMockConsumer();
+ when(consumer0.consumerName()).thenReturn("MzGG2");
+ Consumer consumer2 = createMockConsumer();
+ when(consumer1.consumerName()).thenReturn("rMOYG");
+ Consumer consumer3 = createMockConsumer();
+ when(consumer2.consumerName()).thenReturn("QIleA");
+
+
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
+
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionFactory.create(-1,
-1));
+ persistentDispatcher.addConsumer(consumer0).join();
+
+
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1));
+ persistentDispatcher.addConsumer(consumer1).join();
+
+
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 1));
+ persistentDispatcher.addConsumer(consumer2).join();
+
+
when(cursorMock.getReadPosition()).thenReturn(PositionFactory.create(1, 2));
+ persistentDispatcher.addConsumer(consumer3).join();
+
+ assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(),
3);
+
+ Iterator<Map.Entry<Consumer, Position>> itr =
+
persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
+
+ Map.Entry<Consumer, Position> entry1 = itr.next();
+ assertEquals(entry1.getValue(), PositionFactory.create(1, 1));
+ assertEquals(entry1.getKey(), consumer1);
+
+ Map.Entry<Consumer, Position> entry2 = itr.next();
+ assertEquals(entry2.getValue(), PositionFactory.create(1, 1));
+ assertEquals(entry2.getKey(), consumer2);
+
+ Map.Entry<Consumer, Position> entry3 = itr.next();
+ assertEquals(entry3.getValue(), PositionFactory.create(1, 2));
+ assertEquals(entry3.getKey(), consumer3);
+
+ // Verify: no sorting was executed
+ assertEquals(sortTimes.get(), 0);
+
+ // cleanup.
+ persistentDispatcher.close();
+ }
+
@Test
public void testSendMarkerMessage() {
try {