This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 7988f4fb241 [fix][broker] Fix NPE and annotate nullable return values
for ManagedCursorContainer (#24706)
7988f4fb241 is described below
commit 7988f4fb24131ea877027d38192206279ba10bdf
Author: Jiwe Guo <[email protected]>
AuthorDate: Wed Sep 10 13:26:36 2025 +0800
[fix][broker] Fix NPE and annotate nullable return values for
ManagedCursorContainer (#24706)
---
.../src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java | 4 +++-
.../java/org/apache/pulsar/broker/service/BacklogQuotaManager.java | 3 +++
.../apache/pulsar/broker/service/persistent/PersistentTopic.java | 7 ++++++-
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 05e376218c6..acaa21fcda0 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -38,6 +38,7 @@ import
org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.jspecify.annotations.Nullable;
/**
* A ManagedLedger it's a superset of a BookKeeper ledger concept.
@@ -544,8 +545,9 @@ public interface ManagedLedger {
/**
* Get the slowest consumer.
*
- * @return the slowest consumer
+ * @return the slowest consumer or null if there is no consumer
*/
+ @Nullable
ManagedCursor getSlowestConsumer();
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 689e8514078..5e811ff1dce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -219,6 +219,9 @@ public class BacklogQuotaManager {
try {
for (; ; ) {
ManagedCursor slowestConsumer =
mLedger.getSlowestConsumer();
+ if (slowestConsumer == null) {
+ break;
+ }
Position oldestPosition =
slowestConsumer.getMarkDeletedPosition();
if (log.isDebugEnabled()) {
log.debug("[{}] slowest consumer mark delete position
is [{}], read position is [{}]",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d0424ec912f..f30332a3afc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2162,7 +2162,12 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml,
int messageTtlInSeconds) {
// Find the target position at one time, then expire all subscriptions
and replicators.
- ManagedCursor cursor =
ml.getCursors().getCursorWithOldestPosition().getCursor();
+ final var cursorWithOldestPosition =
ml.getCursors().getCursorWithOldestPosition();
+ if (cursorWithOldestPosition == null) {
+ // Skip checking message expiry for topics without subscription
+ return;
+ }
+ ManagedCursor cursor = cursorWithOldestPosition.getCursor();
PersistentMessageFinder finder = new PersistentMessageFinder(topic,
cursor, brokerService.getPulsar()
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
// Find the target position.