This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 41c32d6ddcd [fix][broker][branch-3.0] Fix wrong backlog age metrics
when the mark delete position point to a deleted ledger (#24518) (#24671)
41c32d6ddcd is described below
commit 41c32d6ddcdc181ea9d8c9bf01efb77b236f7e0e
Author: Penghui Li <[email protected]>
AuthorDate: Wed Aug 27 03:06:29 2025 -0700
[fix][broker][branch-3.0] Fix wrong backlog age metrics when the mark
delete position point to a deleted ledger (#24518) (#24671)
---
.../broker/service/persistent/PersistentTopic.java | 11 +++++-
.../PersistentTopicProtectedMethodsTest.java | 44 +++++++++++++++++++++-
.../pulsar/client/admin/internal/BrokersImpl.java | 2 +-
3 files changed, 54 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3065ce3f70e..997e2a06b3f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3363,7 +3363,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
PositionImpl oldestMarkDeletePosition =
oldestMarkDeleteCursorInfo.getPosition();
OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo;
if (lastOldestPositionInfo != null
- &&
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
== 0) {
+ &&
oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition())
== 0
+ && oldestMarkDeletePosition.compareTo(((ManagedLedgerImpl)
ledger).getFirstPosition()) >= 0) {
// Same position, but the cursor causing it has changed?
if
(!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName()))
{
updateResultIfNewer(new OldestPositionInfo(
@@ -3493,6 +3494,14 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
markDeletePositionLedgerInfo =
ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get();
+ // If markDeletePositionLedgerInfo is null (ledger no longer exists
due to retention/cleanup),
+ // use the next valid position instead to get a meaningful timestamp
+ if (markDeletePositionLedgerInfo == null) {
+ PositionImpl nextValidPosition = ((ManagedLedgerImpl)
ledger).getNextValidPosition(markDeletePosition);
+ markDeletePositionLedgerInfo =
ledger.getLedgerInfo(nextValidPosition.getLedgerId()).get();
+ markDeletePosition = nextValidPosition;
+ }
+
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo
positionToCheckLedgerInfo =
markDeletePositionLedgerInfo;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
index b582eb94d12..d6494ef1d8b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
@@ -51,7 +52,6 @@ public class PersistentTopicProtectedMethodsTest extends
ProducerConsumerBase {
}
protected void doInitConf() throws Exception {
- this.conf.setPreciseTimeBasedBacklogQuotaCheck(true);
this.conf.setManagedLedgerMaxEntriesPerLedger(2);
this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10);
this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
@@ -111,4 +111,46 @@ public class PersistentTopicProtectedMethodsTest extends
ProducerConsumerBase {
c1.close();
admin.topics().delete(tp, false);
}
+
+ @Test
+ public void testEstimatedTimeBasedBacklogQuotaCheckWithTopicUnloading()
throws Exception {
+ final String tp =
BrokerTestUtil.newUniqueName("public/default/tp-with-topic-unloading");
+ admin.topics().createNonPartitionedTopic(tp);
+
+ Consumer<byte[]> c1 =
pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe();
+ Producer<byte[]> p1 = pulsarClient.newProducer().topic(tp).create();
+
+ byte[] content = new byte[]{1};
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService().getTopic(tp, false).join().get();
+
+ Awaitility.await().untilAsserted(() -> {
+ admin.brokers().backlogQuotaCheck();
+
assertTrue(persistentTopic.getBestEffortOldestUnacknowledgedMessageAgeSeconds()
> 0);
+ });
+
+ for (int i = 0; i < 10; i++) {
+ c1.acknowledge(c1.receive());
+ }
+
+ Awaitility.await().untilAsserted(() ->
assertEquals(persistentTopic.getBacklogSize(), 0));
+ admin.topics().unload(tp);
+ for (int i = 0; i < 10; i++) {
+ p1.send(content);
+ }
+
+ PersistentTopic persistentTopicNew = (PersistentTopic)
pulsar.getBrokerService()
+ .getTopic(tp, false).join().get();
+ Awaitility.await().untilAsserted(() -> {
+ admin.brokers().backlogQuotaCheck();
+
assertTrue(persistentTopicNew.getBestEffortOldestUnacknowledgedMessageAgeSeconds()
> 0);
+ });
+
+ p1.close();
+ c1.close();
+ admin.topics().delete(tp, false);
+ }
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
index b82c3fd0f41..23c7302fd95 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java
@@ -161,7 +161,7 @@ public class BrokersImpl extends BaseResource implements
Brokers {
@Override
public CompletableFuture<Void> backlogQuotaCheckAsync() {
- WebTarget path = adminBrokers.path("backlogQuotaCheck");
+ WebTarget path = adminBrokers.path("backlog-quota-check");
return asyncGetRequest(path, new FutureCallback<Void>() {});
}