This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b505a0f3ce5 Dispatcher did unnecessary sort for
recentlyJoinedConsumers and printed noisy error logs (#24634)
b505a0f3ce5 is described below
commit b505a0f3ce52d0117243390f1284ae42651019e5
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 9 19:17:02 2025 +0800
Dispatcher did unnecessary sort for recentlyJoinedConsumers and printed
noisy error logs (#24634)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 42 ++++++++++----
...ntStickyKeyDispatcherMultipleConsumersTest.java | 64 ++++++++++++++++++++++
2 files changed, 95 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 14af67b4573..da59e8c6df2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -85,11 +85,26 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
public boolean sortRecentlyJoinedConsumersIfNeeded = true;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
- Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
+ Subscription
subscription, ServiceConfiguration conf,
+ KeySharedMeta ksm) {
+ this(topic, cursor, subscription, conf, ksm, null);
+ }
+
+ /**
+ * @param recentlyJoinedConsumers This parameter is only used for testing.
+ */
+ @VisibleForTesting
+ PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
+ Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm,
+ LinkedHashMap<Consumer,
PositionImpl> recentlyJoinedConsumers) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
- this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
+ if (recentlyJoinedConsumers == null) {
+ this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null :
new LinkedHashMap<>();
+ } else {
+ this.recentlyJoinedConsumers = recentlyJoinedConsumers;
+ }
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
@@ -154,6 +169,10 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
});
}
+ /**
+ * Sort items in the collection "recentlyJoinedConsumers" if needed.
+ * Since we check the order of queue after each consumer joined, we can
only check the last two items.
+ */
private void sortRecentlyJoinedConsumersIfNeeded() {
if (!sortRecentlyJoinedConsumersIfNeeded) {
return;
@@ -161,20 +180,21 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
if (recentlyJoinedConsumers.size() == 1) {
return;
}
- // Since we check the order of queue after each consumer joined, we
can only check the last two items.
boolean sortNeeded = false;
- PositionImpl posPre = null;
- PositionImpl posAfter = null;
+ PositionImpl secondLatest = null;
+ PositionImpl latest = null;
for (Map.Entry<Consumer, PositionImpl> entry :
recentlyJoinedConsumers.entrySet()) {
- if (posPre == null) {
- posPre = entry.getValue();
+ if (secondLatest == null) {
+ secondLatest = entry.getValue();
+ } else if (latest == null) {
+ latest = entry.getValue();
} else {
- posPre = posAfter;
- posAfter = entry.getValue();
+ secondLatest = latest;
+ latest = entry.getValue();
}
}
- if (posPre != null && posAfter != null) {
- if (posPre.compareTo(posAfter) > 0) {
+ if (secondLatest != null && latest != null) {
+ if (secondLatest.compareTo(latest) > 0) {
sortNeeded = true;
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 9b7c98cc30e..20ffcff90ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -46,6 +47,7 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -299,6 +301,68 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
persistentDispatcher.close();
}
+ @Test
+ public void testSkipSortRecentlyJoinedConsumersIfNotNeeded() throws
Exception {
+ // Inject a sorting counter.
+ LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = new
LinkedHashMap<>();
+ LinkedHashMap<Consumer, PositionImpl> spyRecentlyJoinedConsumers =
spy(recentlyJoinedConsumers);
+ AtomicInteger sortTimes = new AtomicInteger(0);
+ doAnswer(invocationOnMock -> {
+ sortTimes.incrementAndGet();
+ return invocationOnMock.callRealMethod();
+ }).when(spyRecentlyJoinedConsumers).clear();
+
+ PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher =
+ new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT),
spyRecentlyJoinedConsumers);
+
+ Consumer consumer0 = createMockConsumer();
+ when(consumer0.consumerName()).thenReturn("0");
+ Consumer consumer1 = createMockConsumer();
+ when(consumer0.consumerName()).thenReturn("MzGG2");
+ Consumer consumer2 = createMockConsumer();
+ when(consumer1.consumerName()).thenReturn("rMOYG");
+ Consumer consumer3 = createMockConsumer();
+ when(consumer2.consumerName()).thenReturn("QIleA");
+
+
when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
+
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1));
+ persistentDispatcher.addConsumer(consumer0).join();
+
+ when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1));
+ persistentDispatcher.addConsumer(consumer1).join();
+
+ when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 1));
+ persistentDispatcher.addConsumer(consumer2).join();
+
+ when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(1, 2));
+ persistentDispatcher.addConsumer(consumer3).join();
+
+ assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(),
3);
+
+ Iterator<Map.Entry<Consumer, PositionImpl>> itr =
+
persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();
+
+ Map.Entry<Consumer, PositionImpl> entry1 = itr.next();
+ assertEquals(entry1.getValue(), PositionImpl.get(1, 1));
+ assertEquals(entry1.getKey(), consumer1);
+
+ Map.Entry<Consumer, PositionImpl> entry2 = itr.next();
+ assertEquals(entry2.getValue(), PositionImpl.get(1, 1));
+ assertEquals(entry2.getKey(), consumer2);
+
+ Map.Entry<Consumer, PositionImpl> entry3 = itr.next();
+ assertEquals(entry3.getValue(), PositionImpl.get(1, 2));
+ assertEquals(entry3.getKey(), consumer3);
+
+ // Verify: no sorting was executed
+ assertEquals(sortTimes.get(), 0);
+
+ // cleanup.
+ persistentDispatcher.close();
+ }
+
@Test
public void testSendMarkerMessage() {
try {