poorbarcode commented on code in PR #24622:
URL: https://github.com/apache/pulsar/pull/24622#discussion_r2271887525
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2118,19 +2119,75 @@ private CompletableFuture<Void>
checkShadowReplication() {
@Override
public void checkMessageExpiry() {
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
- if (messageTtlInSeconds != 0) {
+ if (messageTtlInSeconds <= 0) {
+ return;
+ }
+
+ // Fallback to the slower solution if managed ledger is not an
instance of ManagedLedgerImpl: each
+ // subscription find position and handle expiring itself.
+ ManagedLedger managedLedger = getManagedLedger();
+ if (!(managedLedger instanceof ManagedLedgerImpl ml)) {
+ subscriptionsCheckMessageExpiryEachOther(messageTtlInSeconds);
+ return;
+ }
+
+ // Find the target position at one time, then expire all subscriptions
and replicators.
+ ManagedCursor cursor =
ml.getCursors().getCursorWithOldestPosition().getCursor();
+ PersistentMessageFinder finder = new PersistentMessageFinder(topic,
cursor, brokerService.getPulsar()
+
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
+ // Find the target position.
+ long expiredMessageTimestamp = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
+ CompletableFuture<Position> positionToMarkDelete = new
CompletableFuture<>();
+ finder.findMessages(expiredMessageTimestamp, new
AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ positionToMarkDelete.complete(position);
+ }
+
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition,
+ Object ctx) {
+ log.error("[{}] Error finding expired position, failed reading
position is {}", topic,
+ failedReadPosition.orElse(null), exception);
+ // Since we have logged the error, we can skip to print error
log at next step.
+ positionToMarkDelete.complete(null);
+ }
+ });
+ positionToMarkDelete.thenAccept(position -> {
+ if (position == null) {
+ // Nothing need to be expired.
+ return;
+ }
+ // Expire messages by position, which is more efficient.
subscriptions.forEach((__, sub) -> {
if (!isCompactionSubscription(sub.getName())
&& (additionalSystemCursorNames.isEmpty()
- ||
!additionalSystemCursorNames.contains(sub.getName()))) {
- sub.expireMessagesAsync(messageTtlInSeconds);
+ ||
!additionalSystemCursorNames.contains(sub.getName()))) {
+ sub.expireMessages(position);
}
});
replicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
+ -> ((PersistentReplicator)
replicator).expireMessages(position));
shadowReplicators.forEach((__, replicator)
- -> ((PersistentReplicator)
replicator).expireMessagesAsync(messageTtlInSeconds));
- }
+ -> ((PersistentReplicator)
replicator).expireMessages(position));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to expire messages by position", topic, ex);
+ return null;
+ });
+ }
+
+ private void subscriptionsCheckMessageExpiryEachOther(int
messageTtlInSeconds) {
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]