This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77d18bfcd372509c3074beea024f1189581116ef Author: Yunze Xu <[email protected]> AuthorDate: Fri Sep 5 15:54:14 2025 +0800 [fix][broker] Fix NPE and annotate nullable return values for ManagedCursorContainer (#24706) (cherry picked from commit e6f66dfa016fcad74ca72808fe7654f1204a5920) --- .../org/apache/bookkeeper/mledger/ManagedLedger.java | 4 +++- .../bookkeeper/mledger/impl/ManagedCursorContainer.java | 17 +++++++++++++---- .../pulsar/broker/service/BacklogQuotaManager.java | 3 +++ .../broker/service/persistent/PersistentTopic.java | 7 ++++++- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 20ff66aa70e..8fb083bcd02 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.jspecify.annotations.Nullable; /** * A ManagedLedger it's a superset of a BookKeeper ledger concept. @@ -579,8 +580,9 @@ public interface ManagedLedger { /** * Get the slowest consumer. * - * @return the slowest consumer + * @return the slowest consumer or null if there is no consumer */ + @Nullable ManagedCursor getSlowestConsumer(); /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index cc740f5d37c..677b2a0a6aa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -23,12 +23,17 @@ import lombok.experimental.UtilityClass; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; +import org.jspecify.annotations.Nullable; /** * Contains cursors for a ManagedLedger. * <p> * The goal is to be able to find out the slowest cursor and hence decide which is the oldest ledger we need to keep. */ [email protected] [email protected] public interface ManagedCursorContainer extends Iterable<ManagedCursor> { /** * Adds a cursor to the container with the specified position. @@ -45,6 +50,7 @@ public interface ManagedCursorContainer extends Iterable<ManagedCursor> { * @param name the name of the cursor * @return the ManagedCursor if found, otherwise null */ + @Nullable ManagedCursor get(String name); /** @@ -63,29 +69,32 @@ public interface ManagedCursorContainer extends Iterable<ManagedCursor> { * @param cursor the cursor to update the position for * @param newPosition the updated position for the cursor * @return a pair of positions, representing the previous slowest cursor and the new slowest cursor (after the - * update). + * update) or null if the cursor does not exist. */ Pair<Position, Position> cursorUpdated(ManagedCursor cursor, Position newPosition); /** * Gets the position of the slowest cursor. * - * @return the position of the slowest cursor + * @return the position of the slowest cursor or null if there is no cursor */ + @Nullable Position getSlowestCursorPosition(); /** * Gets the slowest cursor. * - * @return the slowest ManagedCursor + * @return the slowest ManagedCursor or null if there is no cursor */ + @Nullable ManagedCursor getSlowestCursor(); /** * Gets the cursor's {@link CursorInfo} with the oldest position. * - * @return the CursorInfo containing the cursor and its position + * @return the CursorInfo containing the cursor and its position or null if there is no cursor */ + @Nullable CursorInfo getCursorWithOldestPosition(); /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 689e8514078..5e811ff1dce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -219,6 +219,9 @@ public class BacklogQuotaManager { try { for (; ; ) { ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); + if (slowestConsumer == null) { + break; + } Position oldestPosition = slowestConsumer.getMarkDeletedPosition(); if (log.isDebugEnabled()) { log.debug("[{}] slowest consumer mark delete position is [{}], read position is [{}]", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cc6cbb348f3..657f03d6b38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2166,7 +2166,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, int messageTtlInSeconds) { // Find the target position at one time, then expire all subscriptions and replicators. - ManagedCursor cursor = ml.getCursors().getCursorWithOldestPosition().getCursor(); + final var cursorWithOldestPosition = ml.getCursors().getCursorWithOldestPosition(); + if (cursorWithOldestPosition == null) { + // Skip checking message expiry for topics without subscription + return; + } + ManagedCursor cursor = cursorWithOldestPosition.getCursor(); PersistentMessageFinder finder = new PersistentMessageFinder(topic, cursor, brokerService.getPulsar() .getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); // Find the target position.
