Copilot commented on code in PR #24730:
URL: https://github.com/apache/pulsar/pull/24730#discussion_r2341218462
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##########
@@ -2537,4 +2537,70 @@ public void
testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati
logTopicStats(topic);
}
}
+
+ @Test
+ public void testCustomStickyRange() throws Exception {
+ int messageCount = 100;
+ final String topicName =
"persistent://public/default/test-sticky-range-" + System.nanoTime();
+ final String subscriptionName = "sub-sticky-range";
+
+ // 0. Init topic and subscription
+ admin.topics().createPartitionedTopic(topicName, 4);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // 1. Create a producer and send messages
+ try (Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create()) {
+ for (int i = 0; i < messageCount; i++) {
+ String key = String.valueOf(i);
+ producer.newMessage()
+ .value(String.valueOf(i).getBytes())
+ .key(key)
+ .send();
+ }
+ }
+
+ // 3. One by one create consumers consume message with different
sticky hash range
Review Comment:
The comment has a grammatical error. It should be 'One by one create
consumers to consume messages with different sticky hash ranges' or 'Create
consumers one by one to consume messages with different sticky hash ranges'.
```suggestion
// 3. Create consumers one by one to consume messages with different
sticky hash ranges
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java:
##########
@@ -143,34 +153,38 @@ private synchronized CompletableFuture<Void>
validateKeySharedMeta(Consumer cons
}
}
- private synchronized Consumer findConflictingConsumer(List<IntRange>
ranges) {
- for (IntRange intRange : ranges) {
- Map.Entry<Integer, Consumer> ceilingEntry =
rangeMap.ceilingEntry(intRange.getStart());
- Map.Entry<Integer, Consumer> floorEntry =
rangeMap.floorEntry(intRange.getEnd());
-
- if (floorEntry != null && floorEntry.getKey() >=
intRange.getStart()) {
- return floorEntry.getValue();
- }
-
- if (ceilingEntry != null && ceilingEntry.getKey() <=
intRange.getEnd()) {
- return ceilingEntry.getValue();
+ private synchronized Consumer findConflictingConsumer(List<IntRange>
newConsumerRanges) {
+ for (IntRange newRange : newConsumerRanges) {
+ // 1. Check for potential conflicts with existing ranges that
start before newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart =
rangeMap.floorEntry(newRange.getStart());
+ if (conflictBeforeStart != null) {
+ Range existingRange = conflictBeforeStart.getValue().getLeft();
+ if (checkRangesOverlap(newRange, existingRange)) {
+ return conflictBeforeStart.getValue().getRight();
+ }
}
-
- if (ceilingEntry != null && floorEntry != null &&
ceilingEntry.getValue().equals(floorEntry.getValue())) {
- KeySharedMeta keySharedMeta =
ceilingEntry.getValue().getKeySharedMeta();
- for (IntRange range : keySharedMeta.getHashRangesList()) {
- int start = Math.max(intRange.getStart(),
range.getStart());
- int end = Math.min(intRange.getEnd(), range.getEnd());
- if (end >= start) {
- return ceilingEntry.getValue();
- }
+ // 2. Check for potential conflicts with existing ranges that
start after newRange's start.
+ Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart =
rangeMap.ceilingEntry(newRange.getStart());
+ if (conflictAfterStart != null) {
+ Range existingRange = conflictAfterStart.getValue().getLeft();
+ if (checkRangesOverlap(newRange, existingRange)) {
+ return conflictAfterStart.getValue().getRight();
}
}
}
return null;
}
- Map<Integer, Consumer> getRangeConsumer() {
+
+ private boolean checkRangesOverlap(IntRange range1, Range range2) {
+ return Math.max(range1.getStart(), range2.getStart()) <=
Math.min(range1.getEnd(), range2.getEnd());
+ }
+
+ private boolean checkRangesOverlap(IntRange range1, IntRange range2) {
+ return Math.max(range1.getStart(), range2.getStart()) <=
Math.min(range1.getEnd(), range2.getEnd());
Review Comment:
These two methods have identical logic and only differ in parameter types.
Consider consolidating them into a single generic method or having one delegate
to the other to reduce code duplication.
```suggestion
return checkRangesOverlap(range1, new Range(range2.getStart(),
range2.getEnd()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]