This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8332499ab2d [improve][ml] Clean up managed-ledger code: deduplicate 
logic and use shared utilities (#25298)
8332499ab2d is described below

commit 8332499ab2d1b2a5863fc495e312d2d29c162c70
Author: Penghui Li <[email protected]>
AuthorDate: Mon Mar 9 05:48:27 2026 -0700

    [improve][ml] Clean up managed-ledger code: deduplicate logic and use 
shared utilities (#25298)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  6 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 37 +++++++++-------------
 .../mledger/impl/NullLedgerOffloader.java          | 13 +++-----
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 16 +++-------
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 26 +++++++--------
 5 files changed, 38 insertions(+), 60 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 97333fbb1e3..b8b5b78a62c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -1050,8 +1050,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                             .handle((result, ex) -> {
                                 if (ex != null) {
                                     int rc = BKException.getExceptionCode(ex);
-                                    if (rc == 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
-                                        || rc == 
BKException.Code.NoSuchLedgerExistsException) {
+                                    if 
(Errors.isNoSuchLedgerExistsException(rc)) {
                                         log.info("Ledger {} does not exist, 
ignoring", li.ledgerId);
                                         return null;
                                     }
@@ -1092,8 +1091,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                     .handle((result, ex) -> {
                         if (ex != null) {
                             int rc = BKException.getExceptionCode(ex);
-                            if (rc == 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
-                                    || rc == 
BKException.Code.NoSuchLedgerExistsException) {
+                            if (Errors.isNoSuchLedgerExistsException(rc)) {
                                 log.info("Ledger {} does not exist, ignoring", 
cursor.cursorsLedgerId);
                                 return null;
                             }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ce0bf59edc7..0606c6fb074 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -48,7 +48,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -4515,22 +4514,11 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
-    private static boolean isLedgerNotExistException(int rc) {
-        switch (rc) {
-            case Code.NoSuchLedgerExistsException:
-            case Code.NoSuchLedgerExistsOnMetadataServerException:
-                return true;
-
-            default:
-                return false;
-        }
-    }
-
     public static ManagedLedgerException createManagedLedgerException(int 
bkErrorCode) {
         if (bkErrorCode == BKException.Code.TooManyRequestsException) {
             return new TooManyRequestsException("Too many request error from 
bookies");
         } else if (isBkErrorNotRecoverable(bkErrorCode)) {
-            if (isLedgerNotExistException(bkErrorCode)) {
+            if (isNoSuchLedgerExistsException(bkErrorCode)) {
                 return new 
LedgerNotExistException(BKException.getMessage(bkErrorCode));
             } else {
                 return new 
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
@@ -4541,16 +4529,21 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     }
 
     public static ManagedLedgerException 
createManagedLedgerException(Throwable t) {
-        if (t instanceof org.apache.bookkeeper.client.api.BKException) {
-            return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) 
t).getCode());
-        } else if (t instanceof ManagedLedgerException) {
-            return (ManagedLedgerException) t;
-        } else if (t instanceof CompletionException
-                && !(t.getCause() instanceof CompletionException) /* check to 
avoid stackoverlflow */) {
-            return createManagedLedgerException(t.getCause());
+        if (t == null) {
+            return new ManagedLedgerException("Unknown exception");
+        }
+        Throwable cause = FutureUtil.unwrapCompletionException(t);
+        if (cause == null) {
+            log.error("Exception with null cause for ManagedLedgerException.", 
t);
+            return new ManagedLedgerException("Unknown exception", t);
+        }
+        if (cause instanceof org.apache.bookkeeper.client.api.BKException) {
+            return 
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) 
cause).getCode());
+        } else if (cause instanceof ManagedLedgerException) {
+            return (ManagedLedgerException) cause;
         } else {
-            log.error("Unknown exception for ManagedLedgerException.", t);
-            return new ManagedLedgerException("Other exception", t);
+            log.error("Unknown exception for ManagedLedgerException.", cause);
+            return new ManagedLedgerException("Other exception", cause);
         }
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index fe646bc82e5..807a703d540 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * Null implementation that throws an error on any invokation.
@@ -40,25 +41,19 @@ public class NullLedgerOffloader implements LedgerOffloader 
{
     public CompletableFuture<Void> offload(ReadHandle ledger,
                                            UUID uid,
                                            Map<String, String> extraMetadata) {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
     }
 
     @Override
     public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
                                                        Map<String, String> 
offloadDriverMetadata) {
-        CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
     }
 
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
                                                    Map<String, String> 
offloadDriverMetadata) {
-        CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
-        return promise;
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
     }
 
     @Override
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index bcd87e62629..bf452166920 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -288,16 +288,7 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
             }
         } else {
             updateLatency();
-            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
-            if (cb != null) {
-                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
-                ml.notifyCursors();
-                ml.notifyWaitingEntryCallBacks();
-                ReferenceCountUtil.release(data);
-                this.recycle();
-            } else {
-                ReferenceCountUtil.release(data);
-            }
+            completeAdd(lastEntry, ctx);
         }
     }
 
@@ -314,10 +305,13 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
 
         ml.ledgerClosed(lh);
         updateLatency();
+        completeAdd(PositionFactory.create(lh.getId(), entryId), ctx);
+    }
 
+    private void completeAdd(Position pos, Object ctx) {
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
         if (cb != null) {
-            cb.addComplete(PositionFactory.create(lh.getId(), entryId), 
data.asReadOnly(), ctx);
+            cb.addComplete(pos, data.asReadOnly(), ctx);
             ml.notifyCursors();
             ml.notifyWaitingEntryCallBacks();
             ReferenceCountUtil.release(data);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 4dc60b6434f..352ac341e56 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -203,15 +203,7 @@ public class RangeEntryCacheImpl implements EntryCache {
             return;
         }
 
-        Pair<Integer, Long> removed = entries.removeRange(firstPosition, 
lastPosition, false);
-        int entriesRemoved = removed.getLeft();
-        long sizeRemoved = removed.getRight();
-        if (log.isTraceEnabled()) {
-            log.trace("[{}] Invalidated entries up to {} - Entries removed: {} 
- Size removed: {}", ml.getName(),
-                    lastPosition, entriesRemoved, sizeRemoved);
-        }
-
-        manager.entriesRemoved(sizeRemoved, entriesRemoved);
+        removeRangeAndNotify(firstPosition, lastPosition);
     }
 
     @Override
@@ -219,16 +211,22 @@ public class RangeEntryCacheImpl implements EntryCache {
         final Position firstPosition = PositionFactory.create(ledgerId, 0);
         final Position lastPosition = PositionFactory.create(ledgerId + 1, 0);
 
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Invalidating all entries on ledger {}", 
ml.getName(), ledgerId);
+        }
+        removeRangeAndNotify(firstPosition, lastPosition);
+        pendingReadsManager.invalidateLedger(ledgerId);
+    }
+
+    private void removeRangeAndNotify(Position firstPosition, Position 
lastPosition) {
         Pair<Integer, Long> removed = entries.removeRange(firstPosition, 
lastPosition, false);
         int entriesRemoved = removed.getLeft();
         long sizeRemoved = removed.getRight();
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Invalidated all entries on ledger {} - Entries 
removed: {} - Size removed: {}",
-                    ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
+        if (log.isTraceEnabled()) {
+            log.trace("[{}] Invalidated entries in range [{}, {}] - Entries 
removed: {} - Size removed: {}",
+                    ml.getName(), firstPosition, lastPosition, entriesRemoved, 
sizeRemoved);
         }
-
         manager.entriesRemoved(sizeRemoved, entriesRemoved);
-        pendingReadsManager.invalidateLedger(ledgerId);
     }
 
     @Override

Reply via email to