This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d275bd4004a [improve][broker]Remove block calling that named
cursor.asyncGetNth when expiring messages (#24606)
d275bd4004a is described below
commit d275bd4004ab4d11e9527e10c3069126689ca10a
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 8 12:46:17 2025 +0800
[improve][broker]Remove block calling that named cursor.asyncGetNth when
expiring messages (#24606)
---
.../pulsar/broker/service/MessageExpirer.java | 3 +
.../nonpersistent/NonPersistentSubscription.java | 6 ++
.../persistent/PersistentMessageExpiryMonitor.java | 6 ++
.../service/persistent/PersistentReplicator.java | 17 ++++
.../service/persistent/PersistentSubscription.java | 20 +++++
.../broker/service/persistent/PersistentTopic.java | 41 +++++++++-
.../service/PersistentMessageFinderTest.java | 94 ++++++++++++++++++++++
.../pulsar/broker/service/ReplicatorTest.java | 30 +++++++
8 files changed, 214 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
index 7cb1d2a904a..3008717a3df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -27,4 +28,6 @@ public interface MessageExpirer {
boolean expireMessages(Position position);
boolean expireMessages(int messageTTLInSeconds);
+
+ CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index d469ce1daa1..549a17b2ae4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -472,6 +472,12 @@ public class NonPersistentSubscription extends
AbstractSubscription {
+ " non-persistent topic.");
}
+ @Override
+ public CompletableFuture<Boolean> expireMessagesAsync(int
messageTTLInSeconds) {
+ return CompletableFuture.failedFuture(new
UnsupportedOperationException("Expire message by timestamp is not"
+ + " supported for non-persistent topic."));
+ }
+
@Override
public boolean expireMessages(Position position) {
throw new UnsupportedOperationException("Expire message by position is
not supported for"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index f5199c14b2b..e1f5ade4ee1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
@@ -84,6 +85,11 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
&&
this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}
+ @Override
+ public CompletableFuture<Boolean> expireMessagesAsync(int
messageTTLInSeconds) {
+ return CompletableFuture.supplyAsync(() ->
expireMessages(messageTTLInSeconds), topic.getOrderedExecutor());
+ }
+
@Override
public boolean expireMessages(int messageTTLInSeconds) {
if (!expirationCheckInProgressUpdater.compareAndSet(this, FALSE,
TRUE)) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 5952bb14d22..559b39e0fb4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -681,6 +681,23 @@ public abstract class PersistentReplicator extends
AbstractReplicator
return expiryMonitor.expireMessages(messageTTLInSeconds);
}
+ @Override
+ public CompletableFuture<Boolean> expireMessagesAsync(int
messageTTLInSeconds) {
+ long backlog = cursor.getNumberOfEntriesInBacklog(false);
+ if (backlog == 0) {
+ return CompletableFuture.completedFuture(false);
+ } else if (backlog < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) {
+ return topic.isOldestMessageExpiredAsync(cursor,
messageTTLInSeconds).thenCompose(oldestMsgExpired -> {
+ if (oldestMsgExpired) {
+ return
expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ });
+ }
+ return expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+ }
+
@Override
public boolean expireMessages(Position position) {
return expiryMonitor.expireMessages(position);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 7568c612e14..5f3f9f3aba4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1236,6 +1236,26 @@ public class PersistentSubscription extends
AbstractSubscription {
return expiryMonitor.expireMessages(messageTTLInSeconds);
}
+ @Override
+ public CompletableFuture<Boolean> expireMessagesAsync(int
messageTTLInSeconds) {
+ long backlog = getNumberOfEntriesInBacklog(false);
+ if (backlog == 0) {
+ return CompletableFuture.completedFuture(false);
+ }
+ if (dispatcher != null && dispatcher.isConsumerConnected() && backlog
< MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) {
+ return topic.isOldestMessageExpiredAsync(cursor,
messageTTLInSeconds)
+ .thenCompose(oldestMsgExpired -> {
+ if (oldestMsgExpired) {
+ this.lastExpireTimestamp = System.currentTimeMillis();
+ return
expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+ } else {
+ return CompletableFuture.completedFuture(false);
+ }
+ });
+ }
+ return expiryMonitor.expireMessagesAsync(messageTTLInSeconds);
+ }
+
@Override
public boolean expireMessages(Position position) {
this.lastExpireTimestamp = System.currentTimeMillis();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d8cbc0cb453..51553d9dcae 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
@@ -2122,13 +2123,13 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (!isCompactionSubscription(sub.getName())
&& (additionalSystemCursorNames.isEmpty()
||
!additionalSystemCursorNames.contains(sub.getName()))) {
- sub.expireMessages(messageTtlInSeconds);
+ sub.expireMessagesAsync(messageTtlInSeconds);
}
});
replicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessages(messageTtlInSeconds));
+ -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
shadowReplicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessages(messageTtlInSeconds));
+ -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
}
}
@@ -3909,6 +3910,40 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return isOldestMessageExpired;
}
+ public CompletableFuture<Boolean>
isOldestMessageExpiredAsync(ManagedCursor cursor, int messageTTLInSeconds) {
+ CompletableFuture<Boolean> res = new CompletableFuture<>();
+ cursor.asyncGetNthEntry(1, IndividualDeletedEntries.Include, new
AsyncCallbacks.ReadEntryCallback() {
+
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ long entryTimestamp = 0;
+ try {
+ entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
+ res.complete(MessageImpl.isEntryExpired(
+ (int) (messageTTLInSeconds *
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp));
+ } catch (IOException e) {
+ log.warn("[{}] [{}] Error while getting the oldest
message", topic, cursor.toString(), e);
+ res.complete(false);
+ }
+
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException e, Object ctx) {
+ if
(brokerService.pulsar().getConfiguration().isAutoSkipNonRecoverableData()
+ && e instanceof NonRecoverableLedgerException) {
+ // NonRecoverableLedgerException means the ledger or entry
can't be read anymore.
+ // if AutoSkipNonRecoverableData is set to true, just
return true here.
+ res.complete(true);
+ } else {
+ log.warn("[{}] [{}] Error while getting the oldest
message", topic, cursor.toString(), e);
+ res.complete(false);
+ }
+ }
+ }, null);
+ return res;
+ }
+
/**
* Clears backlog for all cursors in the topic.
*
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 5f30389fb11..4dc10671e6a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -449,6 +449,62 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
+ /**
+ * It tests that message expiry doesn't get stuck if it can't read deleted
ledger's entry.
+ */
+ @Test
+ void testMessageExpiryAsyncWithTimestampNonRecoverableException() throws
Exception {
+
+ final String ledgerAndCursorName =
"testPersistentMessageExpiryWithNonRecoverableLedgers";
+ final int entriesPerLedger = 2;
+ final int totalEntries = 10;
+ final int ttlSeconds = 1;
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(10);
+ config.setMaxEntriesPerLedger(entriesPerLedger);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+ config.setAutoSkipNonRecoverableData(true);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+
+ for (int i = 0; i < totalEntries; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i));
+ }
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(ledger.getState(),
ManagedLedgerImpl.State.LedgerOpened));
+
+ List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
+ LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
+ // The `lastLedgerInfo` should be newly opened, and it does not
contain any entries.
+ // Please refer to: https://github.com/apache/pulsar/pull/22034
+ assertEquals(lastLedgerInfo.getEntries(), 0);
+ assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1);
+
+ // this will make sure that all entries should be deleted
+ Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds));
+
+ bkc.deleteLedger(ledgers.get(0).getLedgerId());
+ bkc.deleteLedger(ledgers.get(1).getLedgerId());
+ bkc.deleteLedger(ledgers.get(2).getLedgerId());
+
+ PersistentTopic mock = mockPersistentTopic("topicname");
+
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ assertTrue(monitor.expireMessagesAsync(ttlSeconds).get());
+ Awaitility.await().untilAsserted(() -> {
+ Position markDeletePosition = c1.getMarkDeletedPosition();
+ // The markDeletePosition points to the last entry of the previous
ledger in lastLedgerInfo.
+ assertEquals(markDeletePosition.getLedgerId(),
lastLedgerInfo.getLedgerId() - 1);
+ assertEquals(markDeletePosition.getEntryId(), entriesPerLedger -
1);
+ });
+
+ c1.close();
+ ledger.close();
+ factory.shutdown();
+
+ }
+
public void testFindMessageWithTimestampAutoSkipNonRecoverable() throws
Exception {
final String ledgerAndCursorName =
"testFindMessageWithTimestampAutoSkipNonRecoverable";
@@ -628,6 +684,7 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
when(mock.getLastPosition()).thenReturn(PositionFactory.EARLIEST);
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(mock).getBrokerService();
+ doReturn(executor).when(mock).getOrderedExecutor();
PulsarService pulsarService = mock(PulsarService.class);
doReturn(pulsarService).when(brokerService).pulsar();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
@@ -672,6 +729,43 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
Assert.assertNull(throwableAtomicReference.get());
}
+ @Test
+ public void testCheckExpiryAsyncByLedgerClosureTimeWithAckUnclosedLedger()
throws Throwable {
+ final String ledgerAndCursorName =
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+ int maxTTLSeconds = 1;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(5);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+ // set client clock to 10 days later
+ long incorrectPublishTimestamp = System.currentTimeMillis() +
TimeUnit.DAYS.toMillis(10);
+ for (int i = 0; i < 7; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i,
incorrectPublishTimestamp));
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ PersistentTopic mock = mockPersistentTopic("topicname");
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
+ (AsyncCallbacks.MarkDeleteCallback) spy(
+ FieldUtils.readDeclaredField(monitor,
"markDeleteCallback", true));
+ FieldUtils.writeField(monitor, "markDeleteCallback",
markDeleteCallback, true);
+
+ AtomicReference<Throwable> throwableAtomicReference = new
AtomicReference<>();
+ Mockito.doAnswer(invocation -> {
+ ManagedLedgerException argument = invocation.getArgument(0,
ManagedLedgerException.class);
+ throwableAtomicReference.set(argument);
+ return invocation.callRealMethod();
+ }).when(markDeleteCallback).markDeleteFailed(any(), any());
+
+ Position position = ledger.getLastConfirmedEntry();
+ c1.markDelete(position);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+ monitor.expireMessagesAsync(maxTTLSeconds).get();
+ assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+
+ Assert.assertNull(throwableAtomicReference.get());
+ }
+
@Test
void testMessageExpiryWithPosition() throws Exception {
final String ledgerAndCursorName =
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 5b5e4328041..3bcfcee8e2a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -669,6 +669,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(status.getReplicationBacklog(), 0);
}
+ @Test(timeOut = 30000)
+ public void testReplicatorExpireMsgAsync() throws Exception {
+
+ // This test is to verify that reset cursor fails on global topic
+ SortedSet<String> testDests = new TreeSet<>();
+
+ final TopicName dest = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic"));
+ testDests.add(dest.toString());
+
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest);
+
+ @Cleanup
+ MessageConsumer consumer1 = new MessageConsumer(url3, dest);
+
+ // Produce from cluster1 and consume from the rest
+ producer1.produce(2);
+ PersistentTopic topic = (PersistentTopic)
pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
+ PersistentReplicator replicator = (PersistentReplicator) spy(
+
topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0)));
+ replicator.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
+ replicator.clearBacklog().get();
+ Thread.sleep(100);
+ replicator.updateRates(); // for code-coverage
+ replicator.expireMessagesAsync(1).get(); // for code-coverage
+ ReplicatorStats status = replicator.computeStats();
+ assertEquals(status.getReplicationBacklog(), 0);
+ }
+
@Test(timeOut = 30000)
public void testResetReplicatorSubscriptionPosition() throws Exception {