This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6f66dfa016 [fix][broker] Fix NPE and annotate nullable return values
for ManagedCursorContainer (#24706)
e6f66dfa016 is described below
commit e6f66dfa016fcad74ca72808fe7654f1204a5920
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Sep 5 15:54:14 2025 +0800
[fix][broker] Fix NPE and annotate nullable return values for
ManagedCursorContainer (#24706)
---
.../org/apache/bookkeeper/mledger/ManagedLedger.java | 4 +++-
.../bookkeeper/mledger/impl/ManagedCursorContainer.java | 17 +++++++++++++----
.../pulsar/broker/service/BacklogQuotaManager.java | 3 +++
.../broker/service/persistent/PersistentTopic.java | 7 ++++++-
4 files changed, 25 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 20ff66aa70e..8fb083bcd02 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -38,6 +38,7 @@ import
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.jspecify.annotations.Nullable;
/**
* A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -579,8 +580,9 @@ public interface ManagedLedger {
/**
* Get the slowest consumer.
*
- * @return the slowest consumer
+ * @return the slowest consumer or null if there is no consumer
*/
+ @Nullable
ManagedCursor getSlowestConsumer();
/**
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index cc740f5d37c..677b2a0a6aa 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -23,12 +23,17 @@ import lombok.experimental.UtilityClass;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+import org.jspecify.annotations.Nullable;
/**
* Contains cursors for a ManagedLedger.
* <p>
* The goal is to be able to find out the slowest cursor and hence decide
which is the oldest ledger we need to keep.
*/
[email protected]
[email protected]
public interface ManagedCursorContainer extends Iterable<ManagedCursor> {
/**
* Adds a cursor to the container with the specified position.
@@ -45,6 +50,7 @@ public interface ManagedCursorContainer extends
Iterable<ManagedCursor> {
* @param name the name of the cursor
* @return the ManagedCursor if found, otherwise null
*/
+ @Nullable
ManagedCursor get(String name);
/**
@@ -63,29 +69,32 @@ public interface ManagedCursorContainer extends
Iterable<ManagedCursor> {
* @param cursor the cursor to update the position for
* @param newPosition the updated position for the cursor
* @return a pair of positions, representing the previous slowest cursor
and the new slowest cursor (after the
- * update).
+ * update) or null if the cursor does not exist.
*/
Pair<Position, Position> cursorUpdated(ManagedCursor cursor, Position
newPosition);
/**
* Gets the position of the slowest cursor.
*
- * @return the position of the slowest cursor
+ * @return the position of the slowest cursor or null if there is no cursor
*/
+ @Nullable
Position getSlowestCursorPosition();
/**
* Gets the slowest cursor.
*
- * @return the slowest ManagedCursor
+ * @return the slowest ManagedCursor or null if there is no cursor
*/
+ @Nullable
ManagedCursor getSlowestCursor();
/**
* Gets the cursor's {@link CursorInfo} with the oldest position.
*
- * @return the CursorInfo containing the cursor and its position
+ * @return the CursorInfo containing the cursor and its position or null
if there is no cursor
*/
+ @Nullable
CursorInfo getCursorWithOldestPosition();
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 689e8514078..5e811ff1dce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -219,6 +219,9 @@ public class BacklogQuotaManager {
try {
for (; ; ) {
ManagedCursor slowestConsumer =
mLedger.getSlowestConsumer();
+ if (slowestConsumer == null) {
+ break;
+ }
Position oldestPosition =
slowestConsumer.getMarkDeletedPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] slowest consumer mark delete position
is [{}], read position is [{}]",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cc6cbb348f3..657f03d6b38 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2166,7 +2166,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml,
int messageTtlInSeconds) {
// Find the target position at one time, then expire all subscriptions
and replicators.
- ManagedCursor cursor =
ml.getCursors().getCursorWithOldestPosition().getCursor();
+ final var cursorWithOldestPosition =
ml.getCursors().getCursorWithOldestPosition();
+ if (cursorWithOldestPosition == null) {
+ // Skip checking message expiry for topics without subscription
+ return;
+ }
+ ManagedCursor cursor = cursorWithOldestPosition.getCursor();
PersistentMessageFinder finder = new PersistentMessageFinder(topic,
cursor, brokerService.getPulsar()
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
// Find the target position.