This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e4431bc6bc8bc93848111472a1108d25ebf77b5d Author: Jiwei Guo <[email protected]> AuthorDate: Thu Aug 28 10:01:11 2025 +0800 [fix][meta] Use `getChildrenFromStore` to read children data to avoid lost data (#24665) (cherry picked from commit 90a70db6fa81f887f65f60194fef3b36438433cb) --- .../java/org/apache/pulsar/metadata/api/MetadataStore.java | 14 ++++++++++++++ .../bookkeeper/AbstractHierarchicalLedgerManager.java | 8 +++++--- .../bookkeeper/LegacyHierarchicalLedgerRangeIterator.java | 12 ++++++++---- .../bookkeeper/LongHierarchicalLedgerRangeIterator.java | 2 +- .../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 2 -- .../pulsar/metadata/impl/FaultInjectionMetadataStore.java | 10 ++++++++++ .../apache/pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +- .../impl/batching/AbstractBatchedMetadataStore.java | 2 +- .../pulsar/metadata/impl/oxia/OxiaMetadataStore.java | 2 +- .../pulsar/metadata/impl/MetadataStoreFactoryImplTest.java | 2 +- 10 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index f0ec8f52375..8383be4d5b3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -77,6 +77,20 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture<List<String>> getChildren(String path); + + /** + * Return all the nodes (lexicographically sorted) that are children to the specific path. + * + * If the path itself does not exist, it will return an empty list. + * + * This method is similar to {@link #getChildren(String)}, but it attempts to read directly from + * the underlying store. + * + * @param path + * the path of the key to get from the store + * @return a future to track the async request + */ + CompletableFuture<List<String>> getChildrenFromStore(String path); /** * Read whether a specific path exists. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java index 4db7f4798c3..bd1c09ddf0f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java @@ -68,7 +68,8 @@ abstract class AbstractHierarchicalLedgerManager { final AsyncCallback.VoidCallback finalCb, final Object context, final int successRc, final int failureRc) { - store.getChildren(path) + store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .thenAccept(levelNodes -> { if (levelNodes.isEmpty()) { finalCb.processResult(successRc, null, context); @@ -162,7 +163,7 @@ abstract class AbstractHierarchicalLedgerManager { * Process ledgers in a single zk node. * * <p> - * for each ledger found in this zk node, processor#process(ledgerId) will be triggerred + * for each ledger found in this zk node, processor#process(ledgerId) will be triggered * to process a specific ledger. after all ledgers has been processed, the finalCb will * be called with provided context object. The RC passed to finalCb is decided by : * <ul> @@ -188,7 +189,8 @@ abstract class AbstractHierarchicalLedgerManager { final String path, final BookkeeperInternalCallbacks.Processor<Long> processor, final AsyncCallback.VoidCallback finalCb, final Object ctx, final int successRc, final int failureRc) { - store.getChildren(path) + store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .thenAccept(ledgerNodes -> { Set<Long> activeLedgers = HierarchicalLedgerUtils.ledgerListToSet(ledgerNodes, ledgerRootPath, path); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java index 37e6dc836f2..cd9533843cc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java @@ -69,7 +69,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg * Iterate next level1 znode. * * @return false if have visited all level1 nodes - * @throws InterruptedException/KeeperException if error occurs reading zookeeper children + * @throws InterruptedException/ExecutionException/TimeoutException if error occurs reading zookeeper children */ private boolean nextL1Node() throws ExecutionException, InterruptedException, TimeoutException { l2NodesIter = null; @@ -83,7 +83,9 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg if (!isLedgerParentNode(curL1Nodes)) { continue; } - List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes) + String path = ledgersRoot + "/" + curL1Nodes; + List<String> l2Nodes = store.sync(path) + .thenCompose(__ -> store.getChildrenFromStore(path)) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l2NodesIter = l2Nodes.iterator(); if (!l2NodesIter.hasNext()) { @@ -99,7 +101,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg boolean hasMoreElements = false; try { if (l1NodesIter == null) { - List<String> l1Nodes = store.getChildren(ledgersRoot) + List<String> l1Nodes = store.sync(ledgersRoot) + .thenCompose(__ -> store.getChildrenFromStore(ledgersRoot)) .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); l1NodesIter = l1Nodes.iterator(); hasMoreElements = nextL1Node(); @@ -162,7 +165,8 @@ public class LegacyHierarchicalLedgerRangeIterator implements LedgerManager.Ledg String nodePath = nodeBuilder.toString(); List<String> ledgerNodes = null; try { - ledgerNodes = store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); + ledgerNodes = store.sync(nodePath).thenCompose(__ -> store.getChildrenFromStore(nodePath)) + .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS); } catch (ExecutionException | TimeoutException e) { throw new IOException("Error when get child nodes from zk", e); } catch (InterruptedException e) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java index 3b32916e6e7..92d3dcbe8f2 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java @@ -59,7 +59,7 @@ class LongHierarchicalLedgerRangeIterator implements LedgerManager.LedgerRangeIt */ List<String> getChildrenAt(String path) throws IOException { try { - return store.getChildren(path) + return store.sync(path).thenCompose(__ -> store.getChildrenFromStore(path)) .get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS); } catch (ExecutionException | TimeoutException e) { if (log.isDebugEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 33ef44d7599..506df8b631d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -87,8 +87,6 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected final AtomicBoolean isClosed = new AtomicBoolean(false); - protected abstract CompletableFuture<List<String>> getChildrenFromStore(String path); - protected abstract CompletableFuture<Boolean> existsFromStore(String path); protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index 91cd3754d69..de0191c7594 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -94,6 +94,16 @@ public class FaultInjectionMetadataStore implements MetadataStoreExtended { return store.getChildren(path); } + @Override + public CompletableFuture<List<String>> getChildrenFromStore(String path) { + Optional<MetadataStoreException> ex = programmedFailure(OperationType.GET_CHILDREN, path); + if (ex.isPresent()) { + return FutureUtil.failedFuture(ex.get()); + } + + return store.getChildrenFromStore(path); + } + @Override public CompletableFuture<Boolean> exists(String path) { Optional<MetadataStoreException> ex = programmedFailure(OperationType.EXISTS, path); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 20e3c4c2b27..752fc7153cf 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -404,7 +404,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore { } @Override - protected CompletableFuture<List<String>> getChildrenFromStore(String path) { + public CompletableFuture<List<String>> getChildrenFromStore(String path) { if (log.isDebugEnabled()) { log.debug("getChildrenFromStore.path={},instanceId={}", path, instanceId); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 4275920d7f9..865213643c3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -144,7 +144,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } @Override - protected final CompletableFuture<List<String>> getChildrenFromStore(String path) { + public final CompletableFuture<List<String>> getChildrenFromStore(String path) { OpGetChildren op = new OpGetChildren(path); enqueue(readOps, op); return op.getFuture(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index e0415f0ef52..c7e1ce24b08 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -147,7 +147,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { } @Override - protected CompletableFuture<List<String>> getChildrenFromStore(String path) { + public CompletableFuture<List<String>> getChildrenFromStore(String path) { var pathWithSlash = path + "/"; return client diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index ec902d51bb0..b9abaece9e4 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -100,7 +100,7 @@ public class MetadataStoreFactoryImplTest { } @Override - protected CompletableFuture<List<String>> getChildrenFromStore(String path) { + public CompletableFuture<List<String>> getChildrenFromStore(String path) { return null; }
