This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f66dfa016 [fix][broker] Fix NPE and annotate nullable return values 
for ManagedCursorContainer (#24706)
e6f66dfa016 is described below

commit e6f66dfa016fcad74ca72808fe7654f1204a5920
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 5 15:54:14 2025 +0800

    [fix][broker] Fix NPE and annotate nullable return values for 
ManagedCursorContainer (#24706)
---
 .../org/apache/bookkeeper/mledger/ManagedLedger.java    |  4 +++-
 .../bookkeeper/mledger/impl/ManagedCursorContainer.java | 17 +++++++++++++----
 .../pulsar/broker/service/BacklogQuotaManager.java      |  3 +++
 .../broker/service/persistent/PersistentTopic.java      |  7 ++++++-
 4 files changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 20ff66aa70e..8fb083bcd02 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -38,6 +38,7 @@ import 
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.jspecify.annotations.Nullable;
 
 /**
  * A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -579,8 +580,9 @@ public interface ManagedLedger {
     /**
      * Get the slowest consumer.
      *
-     * @return the slowest consumer
+     * @return the slowest consumer or null if there is no consumer
      */
+    @Nullable
     ManagedCursor getSlowestConsumer();
 
     /**
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index cc740f5d37c..677b2a0a6aa 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -23,12 +23,17 @@ import lombok.experimental.UtilityClass;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+import org.jspecify.annotations.Nullable;
 
 /**
  * Contains cursors for a ManagedLedger.
  * <p>
  * The goal is to be able to find out the slowest cursor and hence decide 
which is the oldest ledger we need to keep.
  */
[email protected]
[email protected]
 public interface ManagedCursorContainer extends Iterable<ManagedCursor> {
     /**
      * Adds a cursor to the container with the specified position.
@@ -45,6 +50,7 @@ public interface ManagedCursorContainer extends 
Iterable<ManagedCursor> {
      * @param name the name of the cursor
      * @return the ManagedCursor if found, otherwise null
      */
+    @Nullable
     ManagedCursor get(String name);
 
     /**
@@ -63,29 +69,32 @@ public interface ManagedCursorContainer extends 
Iterable<ManagedCursor> {
      * @param cursor the cursor to update the position for
      * @param newPosition the updated position for the cursor
      * @return a pair of positions, representing the previous slowest cursor 
and the new slowest cursor (after the
-     *         update).
+     *         update) or null if the cursor does not exist.
      */
     Pair<Position, Position> cursorUpdated(ManagedCursor cursor, Position 
newPosition);
 
     /**
      * Gets the position of the slowest cursor.
      *
-     * @return the position of the slowest cursor
+     * @return the position of the slowest cursor or null if there is no cursor
      */
+    @Nullable
     Position getSlowestCursorPosition();
 
     /**
      * Gets the slowest cursor.
      *
-     * @return the slowest ManagedCursor
+     * @return the slowest ManagedCursor or null if there is no cursor
      */
+    @Nullable
     ManagedCursor getSlowestCursor();
 
     /**
      * Gets the cursor's {@link CursorInfo} with the oldest position.
      *
-     * @return the CursorInfo containing the cursor and its position
+     * @return the CursorInfo containing the cursor and its position or null 
if there is no cursor
      */
+    @Nullable
     CursorInfo getCursorWithOldestPosition();
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 689e8514078..5e811ff1dce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -219,6 +219,9 @@ public class BacklogQuotaManager {
             try {
                 for (; ; ) {
                     ManagedCursor slowestConsumer = 
mLedger.getSlowestConsumer();
+                    if (slowestConsumer == null) {
+                        break;
+                    }
                     Position oldestPosition = 
slowestConsumer.getMarkDeletedPosition();
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] slowest consumer mark delete position 
is [{}], read position is [{}]",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cc6cbb348f3..657f03d6b38 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2166,7 +2166,12 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, 
int messageTtlInSeconds) {
         // Find the target position at one time, then expire all subscriptions 
and replicators.
-        ManagedCursor cursor = 
ml.getCursors().getCursorWithOldestPosition().getCursor();
+        final var cursorWithOldestPosition = 
ml.getCursors().getCursorWithOldestPosition();
+        if (cursorWithOldestPosition == null) {
+            // Skip checking message expiry for topics without subscription
+            return;
+        }
+        ManagedCursor cursor = cursorWithOldestPosition.getCursor();
         PersistentMessageFinder finder = new PersistentMessageFinder(topic, 
cursor, brokerService.getPulsar()
                 
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
         // Find the target position.

Reply via email to