This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e73532a8ae9 [fix][broker] Ensure KeyShared sticky mode consumer
respects assigned ranges (#24730)
e73532a8ae9 is described below
commit e73532a8ae9f443f5942181f9c70bc796a380160
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Sep 15 15:19:54 2025 +0800
[fix][broker] Ensure KeyShared sticky mode consumer respects assigned
ranges (#24730)
---
...ashRangeExclusiveStickyKeyConsumerSelector.java | 112 ++++++++++++---------
...angeExclusiveStickyKeyConsumerSelectorTest.java | 106 +++++++++++++++++--
.../client/api/KeySharedSubscriptionTest.java | 66 ++++++++++++
3 files changed, 228 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 904fb702a94..7fb99838197 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.IntRange;
-import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.util.FutureUtil;
/**
@@ -38,7 +39,7 @@ import org.apache.pulsar.common.util.FutureUtil;
public class HashRangeExclusiveStickyKeyConsumerSelector implements
StickyKeyConsumerSelector {
private final int rangeSize;
private final Range keyHashRange;
- private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
+ private final ConcurrentSkipListMap<Integer, Pair<Range, Consumer>>
rangeMap;
public HashRangeExclusiveStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
@@ -73,47 +74,41 @@ public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyCon
+ conflictingConsumer);
}
for (IntRange intRange :
consumer.getKeySharedMeta().getHashRangesList()) {
- rangeMap.put(intRange.getStart(), consumer);
- rangeMap.put(intRange.getEnd(), consumer);
+ rangeMap.put(intRange.getStart(),
Pair.of(Range.of(intRange.getStart(), intRange.getEnd()), consumer));
}
return Optional.empty();
}
@Override
public synchronized Optional<ImpactedConsumersResult>
removeConsumer(Consumer consumer) {
- rangeMap.entrySet().removeIf(entry ->
entry.getValue().equals(consumer));
+ rangeMap.entrySet().removeIf(entry ->
entry.getValue().getRight().equals(consumer));
return Optional.empty();
}
@Override
public synchronized ConsumerHashAssignmentsSnapshot
getConsumerHashAssignmentsSnapshot() {
List<HashRangeAssignment> result = new ArrayList<>();
- Map.Entry<Integer, Consumer> prev = null;
- for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
- if (prev == null) {
- prev = entry;
- } else {
- if (prev.getValue().equals(entry.getValue())) {
- result.add(new HashRangeAssignment(Range.of(prev.getKey(),
entry.getKey()), entry.getValue()));
- }
- prev = null;
- }
+ for (Map.Entry<Integer, Pair<Range, Consumer>> entry :
rangeMap.entrySet()) {
+ Range assignedRange = entry.getValue().getLeft();
+ Consumer assignedConsumer = entry.getValue().getRight();
+ result.add(new HashRangeAssignment(assignedRange,
assignedConsumer));
}
return ConsumerHashAssignmentsSnapshot.of(result);
}
@Override
public Consumer select(int hash) {
- if (rangeMap.size() > 0) {
- Map.Entry<Integer, Consumer> ceilingEntry =
rangeMap.ceilingEntry(hash);
- Map.Entry<Integer, Consumer> floorEntry =
rangeMap.floorEntry(hash);
- Consumer ceilingConsumer = ceilingEntry != null ?
ceilingEntry.getValue() : null;
- Consumer floorConsumer = floorEntry != null ?
floorEntry.getValue() : null;
- if (floorConsumer != null &&
floorConsumer.equals(ceilingConsumer)) {
- return ceilingConsumer;
- } else {
- return null;
- }
+ if (rangeMap.isEmpty()) {
+ return null;
+ }
+
+ Map.Entry<Integer, Pair<Range, Consumer>> floorEntry =
rangeMap.floorEntry(hash);
+ if (floorEntry == null) {
+ return null;
+ }
+ Pair<Range, Consumer> pair = floorEntry.getValue();
+ if (pair.getLeft().contains(hash)) {
+ return pair.getRight();
} else {
return null;
}
@@ -129,10 +124,25 @@ public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyCon
return FutureUtil.failedFuture(new
BrokerServiceException.ConsumerAssignException(
"Ranges for KeyShared policy must not be empty."));
}
- for (IntRange intRange : ranges) {
- if (intRange.getStart() > intRange.getEnd()) {
+ List<IntRange> sortedRanges = new ArrayList<>(ranges);
+ sortedRanges.sort(Comparator.comparingInt(IntRange::getStart));
+ for (int i = 0; i < sortedRanges.size(); i++) {
+ IntRange currentRange = sortedRanges.get(i);
+ // 1. Validate: check if start > end for the current range
+ if (currentRange.getStart() > currentRange.getEnd()) {
return FutureUtil.failedFuture(
- new
BrokerServiceException.ConsumerAssignException("Fixed hash range start > end"));
+ new
BrokerServiceException.ConsumerAssignException("Fixed hash range start > end
for range: "
+ + "[" + currentRange.getStart() + "," +
currentRange.getEnd() + "]"));
+ }
+ // 2. Validate: check for overlaps with the next range in the
sorted list
+ if (i < sortedRanges.size() - 1) {
+ IntRange nextRange = sortedRanges.get(i + 1);
+ if (areRangesOverlapping(currentRange, nextRange)) {
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.ConsumerAssignException("Consumer's own ranges conflict:
"
+ + "[" + currentRange.getStart() + "," +
currentRange.getEnd() + "] "
+ + "overlaps with [" + nextRange.getStart()
+ "," + nextRange.getEnd() + "]"));
+ }
}
}
Consumer conflictingConsumer = findConflictingConsumer(ranges);
@@ -143,34 +153,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyCon
}
}
- private synchronized Consumer findConflictingConsumer(List<IntRange>
ranges) {
- for (IntRange intRange : ranges) {
- Map.Entry<Integer, Consumer> ceilingEntry =
rangeMap.ceilingEntry(intRange.getStart());
- Map.Entry<Integer, Consumer> floorEntry =
rangeMap.floorEntry(intRange.getEnd());
-
- if (floorEntry != null && floorEntry.getKey() >=
intRange.getStart()) {
- return floorEntry.getValue();
- }
-
- if (ceilingEntry != null && ceilingEntry.getKey() <=
intRange.getEnd()) {
- return ceilingEntry.getValue();
+ private synchronized Consumer findConflictingConsumer(List<IntRange>
newConsumerRanges) {
+ for (IntRange newRange : newConsumerRanges) {
+ // 1. Check for potential conflicts with existing ranges that
start before newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart =
rangeMap.floorEntry(newRange.getStart());
+ if (conflictBeforeStart != null) {
+ Range existingRange = conflictBeforeStart.getValue().getLeft();
+ if (areRangesOverlapping(newRange, existingRange)) {
+ return conflictBeforeStart.getValue().getRight();
+ }
}
-
- if (ceilingEntry != null && floorEntry != null &&
ceilingEntry.getValue().equals(floorEntry.getValue())) {
- KeySharedMeta keySharedMeta =
ceilingEntry.getValue().getKeySharedMeta();
- for (IntRange range : keySharedMeta.getHashRangesList()) {
- int start = Math.max(intRange.getStart(),
range.getStart());
- int end = Math.min(intRange.getEnd(), range.getEnd());
- if (end >= start) {
- return ceilingEntry.getValue();
- }
+ // 2. Check for potential conflicts with existing ranges that
start after newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart =
rangeMap.ceilingEntry(newRange.getStart());
+ if (conflictAfterStart != null) {
+ Range existingRange = conflictAfterStart.getValue().getLeft();
+ if (areRangesOverlapping(newRange, existingRange)) {
+ return conflictAfterStart.getValue().getRight();
}
}
}
return null;
}
- Map<Integer, Consumer> getRangeConsumer() {
+
+ private static boolean areRangesOverlapping(IntRange range1, Range range2)
{
+ return Math.max(range1.getStart(), range2.getStart()) <=
Math.min(range1.getEnd(), range2.getEnd());
+ }
+
+ private static boolean areRangesOverlapping(IntRange range1, IntRange
range2) {
+ return Math.max(range1.getStart(), range2.getStart()) <=
Math.min(range1.getEnd(), range2.getEnd());
+ }
+
+ Map<Integer, Pair<Range, Consumer>> getRangeConsumer() {
return Collections.unmodifiableMap(rangeMap);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
index f3828981c8e..7dd3bde4489 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
@@ -50,7 +50,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
selector.addConsumer(consumer1).get();
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
Consumer selectedConsumer;
for (int i = 0; i < 3; i++) {
selectedConsumer = selector.select(i);
@@ -66,7 +66,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2);
Assert.assertEquals(consumer2.getKeySharedMeta(), keySharedMeta2);
selector.addConsumer(consumer2).get();
- Assert.assertEquals(selector.getRangeConsumer().size(), 4);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 2);
for (int i = 3; i < 10; i++) {
selectedConsumer = selector.select(i);
@@ -79,7 +79,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
}
selector.removeConsumer(consumer1);
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
selectedConsumer = selector.select(1);
Assert.assertNull(selectedConsumer);
@@ -89,6 +89,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest
{
Assert.assertNull(selectedConsumer);
}
+
+ @Test
+ public void testConsumerSelectWithMultipleRanges() throws
ExecutionException, InterruptedException {
+
+ HashRangeExclusiveStickyKeyConsumerSelector selector = new
HashRangeExclusiveStickyKeyConsumerSelector(20);
+ Consumer consumer1 = mock(Consumer.class);
+ KeySharedMeta keySharedMeta1 = new KeySharedMeta()
+ .setKeySharedMode(KeySharedMode.STICKY);
+ keySharedMeta1.addHashRange().setStart(2).setEnd(6);
+ keySharedMeta1.addHashRange().setStart(10).setEnd(15);
+ when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
+ Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
+ selector.addConsumer(consumer1).get();
+ Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Consumer selectedConsumer;
+ for (int i = 2; i <= 6; i++) {
+ selectedConsumer = selector.select(i);
+ Assert.assertEquals(selectedConsumer, consumer1);
+ }
+ for (int i = 10; i <= 15; i++) {
+ selectedConsumer = selector.select(i);
+ Assert.assertEquals(selectedConsumer, consumer1);
+ }
+ selectedConsumer = selector.select(1);
+ Assert.assertNull(selectedConsumer);
+ selectedConsumer = selector.select(9);
+ Assert.assertNull(selectedConsumer);
+ selectedConsumer = selector.select(18);
+ Assert.assertNull(selectedConsumer);
+
+ }
+
@Test
public void testEmptyRanges() {
HashRangeExclusiveStickyKeyConsumerSelector selector = new
HashRangeExclusiveStickyKeyConsumerSelector(10);
@@ -189,6 +221,66 @@ public class
HashRangeExclusiveStickyKeyConsumerSelectorTest {
}
}
+ @Test
+ public void testShouldConflictConsumerWithSelfOverlappingRanges() throws
ExecutionException, InterruptedException {
+ HashRangeExclusiveStickyKeyConsumerSelector selector = new
HashRangeExclusiveStickyKeyConsumerSelector(10);
+
+ final List<IntRange> testRanges = new ArrayList<>();
+ testRanges.add(new IntRange().setStart(1).setEnd(3));
+ testRanges.add(new IntRange().setStart(2).setEnd(2));
+ testRanges.add(new IntRange().setStart(5).setEnd(5));
+ testRanges.add(new IntRange().setStart(4).setEnd(6));
+ KeySharedMeta keySharedMeta = new KeySharedMeta()
+ .setKeySharedMode(KeySharedMode.STICKY);
+ keySharedMeta.addAllHashRanges(testRanges);
+ Consumer consumer = mock(Consumer.class);
+ when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
+ Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
+
+ try {
+ selector.addConsumer(consumer).get();
+ Assert.fail("should be failed");
+ } catch (ExecutionException | InterruptedException e) {
+ // ignore
+ }
+
+ Assert.assertEquals(selector.getRangeConsumer().size(), 0);
+ }
+
+ @Test
+ public void testShouldConflictConsumerWithBoundaryRanges() throws
ExecutionException, InterruptedException {
+ HashRangeExclusiveStickyKeyConsumerSelector selector = new
HashRangeExclusiveStickyKeyConsumerSelector(10);
+ TransportCnx transportCnx = mock(TransportCnx.class);
+
+ // 1. add consumer 1 with range [2, 5]
+ KeySharedMeta keySharedMeta1 = new KeySharedMeta()
+ .setKeySharedMode(KeySharedMode.STICKY);
+ keySharedMeta1.addAllHashRanges(List.of(new
IntRange().setStart(2).setEnd(5)));
+ Consumer consumer1 = mock(Consumer.class);
+ when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
+ when(consumer1.cnx()).thenReturn(transportCnx);
+
when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null));
+ Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
+ selector.addConsumer(consumer1).get();
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
+
+ // 2. add consumer 2 with range [5, 10], should be conflict with
consumer 1
+ KeySharedMeta keySharedMeta2 = new KeySharedMeta()
+ .setKeySharedMode(KeySharedMode.STICKY);
+ keySharedMeta2.addAllHashRanges(List.of(new
IntRange().setStart(5).setEnd(10)));
+ Consumer consumer2 = mock(Consumer.class);
+ when(consumer2.cnx()).thenReturn(transportCnx);
+
when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null));
+ when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2);
+ try {
+ selector.addConsumer(consumer2).get();
+ Assert.fail("should be failed");
+ } catch (ExecutionException | InterruptedException e) {
+ // ignore
+ }
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
+ }
+
@Test
public void testSingleRangeConflict() throws ExecutionException,
InterruptedException {
HashRangeExclusiveStickyKeyConsumerSelector selector = new
HashRangeExclusiveStickyKeyConsumerSelector(10);
@@ -202,7 +294,7 @@ public class
HashRangeExclusiveStickyKeyConsumerSelectorTest {
when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
selector.addConsumer(consumer1).get();
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
final List<IntRange> testRanges = new ArrayList<>();
testRanges.add(new IntRange().setStart(4).setEnd(6));
@@ -228,7 +320,7 @@ public class
HashRangeExclusiveStickyKeyConsumerSelectorTest {
} catch (ExecutionException | InterruptedException e) {
// ignore
}
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
}
}
@@ -245,7 +337,7 @@ public class
HashRangeExclusiveStickyKeyConsumerSelectorTest {
when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
selector.addConsumer(consumer1).get();
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
final List<List<IntRange>> testRanges = new ArrayList<>();
testRanges.add(List.of(
@@ -271,7 +363,7 @@ public class
HashRangeExclusiveStickyKeyConsumerSelectorTest {
} catch (ExecutionException | InterruptedException e) {
// ignore
}
- Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+ Assert.assertEquals(selector.getRangeConsumer().size(), 1);
}
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 80b5070077d..7f7de33b43a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -2668,4 +2668,70 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
logTopicStats(topic);
}
}
+
+ @Test
+ public void testCustomStickyRange() throws Exception {
+ int messageCount = 100;
+ final String topicName =
"persistent://public/default/test-sticky-range-" + System.nanoTime();
+ final String subscriptionName = "sub-sticky-range";
+
+ // 0. Init topic and subscription
+ admin.topics().createPartitionedTopic(topicName, 4);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // 1. Create a producer and send messages
+ try (Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create()) {
+ for (int i = 0; i < messageCount; i++) {
+ String key = String.valueOf(i);
+ producer.newMessage()
+ .value(String.valueOf(i).getBytes())
+ .key(key)
+ .send();
+ }
+ }
+
+ // 3. One by one create consumers consume message with different
sticky hash range
+ KeySharedPolicy.KeySharedPolicySticky policy1 =
KeySharedPolicy.stickyHashRange()
+ .ranges(Range.of(0, 9999), Range.of(20000, 29999),
Range.of(40000, 49999));
+ KeySharedPolicy.KeySharedPolicySticky policy2 =
KeySharedPolicy.stickyHashRange()
+ .ranges(Range.of(10000, 19999), Range.of(30000, 39999),
Range.of(50000, 65535));
+
+ List<KeySharedPolicy.KeySharedPolicySticky> policies =
Arrays.asList(policy1, policy2);
+ int[] receivedCounts = new int[2];
+ for (int i = 0; i < policies.size(); i++) {
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .keySharedPolicy(policies.get(i))
+ .subscribe();
+ while (true) {
+ try {
+ Message<byte[]> msg = consumer.receive(2,
TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ receivedCounts[i]++;
+ log.debug("Consumer #{} received message with key:{}
total:{}",
+ i + 1, msg.getKey(), receivedCounts[i]);
+ } catch (Exception e) {
+ break;
+ }
+ }
+ // make sure consume closed before start next consumer
+ consumer.close();
+ }
+
+ int consumer1Received = receivedCounts[0];
+ int consumer2Received = receivedCounts[1];
+
+ log.info("Consumer1 total received: {}", consumer1Received);
+ log.info("Consumer2 total received: {}", consumer2Received);
+ Assert.assertEquals(consumer1Received + consumer2Received,
messageCount,
+ "Total messages received by both consumers should be " +
messageCount);
+ }
}