This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 7988f4fb241 [fix][broker] Fix NPE and annotate nullable return values 
for ManagedCursorContainer (#24706)
7988f4fb241 is described below

commit 7988f4fb24131ea877027d38192206279ba10bdf
Author: Jiwe Guo <[email protected]>
AuthorDate: Wed Sep 10 13:26:36 2025 +0800

    [fix][broker] Fix NPE and annotate nullable return values for 
ManagedCursorContainer (#24706)
---
 .../src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java | 4 +++-
 .../java/org/apache/pulsar/broker/service/BacklogQuotaManager.java | 3 +++
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java   | 7 ++++++-
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 05e376218c6..acaa21fcda0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -38,6 +38,7 @@ import 
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.jspecify.annotations.Nullable;
 
 /**
  * A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -544,8 +545,9 @@ public interface ManagedLedger {
     /**
      * Get the slowest consumer.
      *
-     * @return the slowest consumer
+     * @return the slowest consumer or null if there is no consumer
      */
+    @Nullable
     ManagedCursor getSlowestConsumer();
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 689e8514078..5e811ff1dce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -219,6 +219,9 @@ public class BacklogQuotaManager {
             try {
                 for (; ; ) {
                     ManagedCursor slowestConsumer = 
mLedger.getSlowestConsumer();
+                    if (slowestConsumer == null) {
+                        break;
+                    }
                     Position oldestPosition = 
slowestConsumer.getMarkDeletedPosition();
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] slowest consumer mark delete position 
is [{}], read position is [{}]",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d0424ec912f..f30332a3afc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2162,7 +2162,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, 
int messageTtlInSeconds) {
         // Find the target position at one time, then expire all subscriptions 
and replicators.
-        ManagedCursor cursor = 
ml.getCursors().getCursorWithOldestPosition().getCursor();
+        final var cursorWithOldestPosition = 
ml.getCursors().getCursorWithOldestPosition();
+        if (cursorWithOldestPosition == null) {
+            // Skip checking message expiry for topics without subscription
+            return;
+        }
+        ManagedCursor cursor = cursorWithOldestPosition.getCursor();
         PersistentMessageFinder finder = new PersistentMessageFinder(topic, 
cursor, brokerService.getPulsar()
                 
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
         // Find the target position.

Reply via email to