This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 84205ebd849 [improve][broker]Find the target position at most once,
during expiring messages for a topic, even though there are many subscriptions
(#24622)
84205ebd849 is described below
commit 84205ebd849479edac2c6533ea9259091e2e5bed
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 28 16:44:44 2025 +0800
[improve][broker]Find the target position at most once, during expiring
messages for a topic, even though there are many subscriptions (#24622)
---
.../bookkeeper/mledger/impl/OpFindNewest.java | 3 +
.../pulsar/broker/service/MessageExpirer.java | 10 +++
.../persistent/PersistentMessageExpiryMonitor.java | 15 +++++
.../broker/service/persistent/PersistentTopic.java | 76 ++++++++++++++++++++--
.../impl}/PersistentMessageExpiryMonitorTest.java | 71 ++++++++++++++++++--
.../broker/service/PersistentTopicE2ETest.java | 4 +-
.../pulsar/broker/stats/PrometheusMetricsTest.java | 7 +-
7 files changed, 171 insertions(+), 15 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 8f86cb33ae8..31c1a090b50 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -250,6 +250,9 @@ class OpFindNewest implements ReadEntryCallback {
return nextPosition;
}
+ /**
+ * Find the largest entry that matches the given predicate.
+ */
public void find() {
if (cursor != null ? cursor.hasMoreEntries(searchPosition) :
ledger.hasMoreEntries(searchPosition)) {
ledger.asyncReadEntry(searchPosition, this, null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
index 3008717a3df..60cf5bd0523 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java
@@ -25,9 +25,19 @@ import
org.apache.pulsar.common.classification.InterfaceStability;
@InterfaceStability.Evolving
public interface MessageExpirer {
+ /**
+ * Mark delete the largest position that is less than or equals the
{@param position}.
+ */
boolean expireMessages(Position position);
+ /**
+ * Mark delete the largest message that publish timestamp is less than the
result of the expression
+ * "{@link System#currentTimeMillis - {@param messageTTLInSeconds})".
+ */
boolean expireMessages(int messageTTLInSeconds);
+ /**
+ * Async implementation of {@link #expireMessages(int)}.
+ */
CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index e1f5ade4ee1..9191080d671 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -35,6 +35,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistExcept
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.client.impl.MessageImpl;
@@ -140,6 +141,20 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
Position topicLastPosition = this.topic.getLastPosition();
+ ManagedLedger managedLedger = cursor.getManagedLedger();
+ if (managedLedger instanceof ManagedLedgerImpl ml) {
+ // Confirm the position is valid.
+ Optional<MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgerInfoOptional =
+ ml.getOptionalLedgerInfo(messagePosition.getLedgerId());
+ if (ledgerInfoOptional.isPresent()) {
+ if (messagePosition.getEntryId() >= 0
+ && ledgerInfoOptional.get().getEntries() - 1 >=
messagePosition.getEntryId()) {
+ findEntryComplete(messagePosition, null);
+ return true;
+ }
+ }
+ }
+ // Fallback to the slower solution if the managed ledger is not an
instance of ManagedLedgerImpl.
if (topicLastPosition.compareTo(messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task,
given position {} is beyond "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 51553d9dcae..0e40894cbd1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.PositionBound;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -2118,19 +2119,82 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void checkMessageExpiry() {
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
- if (messageTtlInSeconds != 0) {
+ if (messageTtlInSeconds <= 0) {
+ return;
+ }
+
+ ManagedLedger managedLedger = getManagedLedger();
+ if (managedLedger instanceof ManagedLedgerImpl ml) {
+ checkMessageExpiryWithSharedPosition(ml, messageTtlInSeconds);
+ } else {
+ // Fallback to the slower solution if managed ledger is not an
instance of ManagedLedgerImpl: each
+ // subscription find position and handle expiring itself.
+ checkMessageExpiryWithoutSharedPosition(messageTtlInSeconds);
+ }
+ }
+
+ private void checkMessageExpiryWithoutSharedPosition(int
messageTtlInSeconds) {
+ subscriptions.forEach((__, sub) -> {
+ if (!isCompactionSubscription(sub.getName())
+ && (additionalSystemCursorNames.isEmpty()
+ || !additionalSystemCursorNames.contains(sub.getName()))) {
+ sub.expireMessagesAsync(messageTtlInSeconds);
+ }
+ });
+ replicators.forEach((__, replicator)
+ -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
+ shadowReplicators.forEach((__, replicator)
+ -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
+ }
+
+ private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml,
int messageTtlInSeconds) {
+ // Find the target position at one time, then expire all subscriptions
and replicators.
+ ManagedCursor cursor =
ml.getCursors().getCursorWithOldestPosition().getCursor();
+ PersistentMessageFinder finder = new PersistentMessageFinder(topic,
cursor, brokerService.getPulsar()
+
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
+ // Find the target position.
+ long expiredMessageTimestamp = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
+ CompletableFuture<Position> positionToMarkDelete = new
CompletableFuture<>();
+ finder.findMessages(expiredMessageTimestamp, new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionToMarkDelete.complete(position);
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition,
+ Object ctx) {
+ log.error("[{}] Error finding expired position, failed reading
position is {}", topic,
+ failedReadPosition.orElse(null), exception);
+ // Since we have logged the error, we can skip to print error
log at next step.
+ positionToMarkDelete.complete(null);
+ }
+ });
+ positionToMarkDelete.thenAccept(position -> {
+ if (position == null) {
+ // Nothing need to be expired.
+ return;
+ }
+ // Expire messages by position, which is more efficient.
subscriptions.forEach((__, sub) -> {
if (!isCompactionSubscription(sub.getName())
&& (additionalSystemCursorNames.isEmpty()
- ||
!additionalSystemCursorNames.contains(sub.getName()))) {
- sub.expireMessagesAsync(messageTtlInSeconds);
+ ||
!additionalSystemCursorNames.contains(sub.getName()))) {
+ // The variable "position" is to mark delete position.
+ // Regarding the method "expireMessages(position)", it
will mark delete the target position if the
+ // position is valid, otherwise, it mark deletes the
previous valid position.
+ // So we give it the position to be mark deleted.
+ sub.expireMessages(position);
}
});
replicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
+ -> ((PersistentReplicator)
replicator).expireMessages(position));
shadowReplicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
- }
+ -> ((PersistentReplicator)
replicator).expireMessages(position));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to expire messages by position", topic, ex);
+ return null;
+ });
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
similarity index 63%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
rename to
pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
index 5535561a5fa..de7d87e4293 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java
@@ -16,28 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service.persistent;
+package org.apache.bookkeeper.mledger.impl;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -59,6 +62,11 @@ public class PersistentMessageExpiryMonitorTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Override
+ protected void doInitConf() throws Exception {
+ conf.setMessageExpiryCheckIntervalInMinutes(60);
+ }
+
/***
* Confirm the anti-concurrency mechanism
"expirationCheckInProgressUpdater" works.
*/
@@ -76,7 +84,7 @@ public class PersistentMessageExpiryMonitorTest extends
ProducerConsumerBase {
(PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl)
ml.getCursors().get(cursorName);
- ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+ ManagedCursorImpl spyCursor = spy(cursor);
// Make the mark-deleting delay.
CountDownLatch firstFindingCompleted = new CountDownLatch(1);
@@ -138,4 +146,57 @@ public class PersistentMessageExpiryMonitorTest extends
ProducerConsumerBase {
producer.close();
admin.topics().delete(topicName);
}
+
+ /***
+ * Verify finding position task only executes once for multiple
subscriptions of a topic.
+ */
+ @Test(invocationCount = 2)
+ void testTopicExpireMessages() throws Exception {
+ // Create topic.
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ final String cursorName1 = "s1";
+ final String cursorName2 = "s2";
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ admin.topics().createSubscriptionAsync(topicName, cursorName1,
MessageId.earliest);
+ admin.topics().createSubscriptionAsync(topicName, cursorName2,
MessageId.earliest);
+ admin.topicPolicies().setMessageTTL(topicName, 1);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(1,
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get().intValue());
+ });
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ ml.getConfig().setMaxEntriesPerLedger(2);
+ ml.getConfig().setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ long firstLedger = ml.currentLedger.getId();
+ System.out.println("maxEntriesPerLedger 1 : " +
ml.getConfig().getMaxEntriesPerLedger());
+ // Trigger 3 ledgers creation.
+ for (int i = 0; i < 5; i++) {
+ producer.send("" + i);
+ Thread.sleep(100);
+ }
+ System.out.println("maxEntriesPerLedger 2 : " +
ml.getConfig().getMaxEntriesPerLedger());
+ assertEquals(3, ml.getLedgersInfo().size());
+ // Do a injection to count the access of the first ledger.
+ AtomicInteger accessedCount = new AtomicInteger();
+ ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get();
+ ReadHandle spyReadHandle = spy(readHandle);
+ doAnswer(invocationOnMock -> {
+ long startEntry = (long) invocationOnMock.getArguments()[0];
+ if (startEntry == 0) {
+ accessedCount.incrementAndGet();
+ }
+ return invocationOnMock.callRealMethod();
+ }).when(spyReadHandle).readAsync(anyLong(), anyLong());
+ ml.ledgerCache.put(firstLedger,
CompletableFuture.completedFuture(spyReadHandle));
+ // Verify: the first ledger will be accessed only once after expiry
for two subscriptions.
+ persistentTopic.checkMessageExpiry();
+ Thread.sleep(2000);
+ assertEquals(1, accessedCount.get());
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topicName);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index c6fc9bd0eef..cf08dd48261 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -1119,12 +1119,12 @@ public class PersistentTopicE2ETest extends
BrokerTestBase {
rolloverPerIntervalStats();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
- Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs - 1));
runMessageExpiryCheck();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
- Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1 + messageTTLSecs / 2));
runMessageExpiryCheck();
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 50e5d34145c..4a99576d0f5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -817,11 +817,14 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
p2.close();
// Let the message expire
for (String topic : topicList) {
+ // The TTL value can not be set to a negative value, the mininum
value is 1.
PersistentTopic persistentTopic = (PersistentTopic)
pulsar.getBrokerService()
.getTopicIfExists(topic).get().get();
-
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
-
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
+
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(1);
+
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(1);
}
+ // Wait 2 seconds to expire message.
+ Thread.sleep(2000);
pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
//wait for checkMessageExpiry
PersistentSubscription sub = (PersistentSubscription)