This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90a70db6fa8 [fix][meta] Use `getChildrenFromStore` to read children
data to avoid lost data (#24665)
90a70db6fa8 is described below
commit 90a70db6fa81f887f65f60194fef3b36438433cb
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Aug 28 10:01:11 2025 +0800
[fix][meta] Use `getChildrenFromStore` to read children data to avoid lost
data (#24665)
---
.../java/org/apache/pulsar/metadata/api/MetadataStore.java | 14 ++++++++++++++
.../bookkeeper/AbstractHierarchicalLedgerManager.java | 8 +++++---
.../bookkeeper/LegacyHierarchicalLedgerRangeIterator.java | 12 ++++++++----
.../bookkeeper/LongHierarchicalLedgerRangeIterator.java | 2 +-
.../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 2 --
.../pulsar/metadata/impl/FaultInjectionMetadataStore.java | 10 ++++++++++
.../apache/pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +-
.../impl/batching/AbstractBatchedMetadataStore.java | 2 +-
.../pulsar/metadata/impl/oxia/OxiaMetadataStore.java | 2 +-
.../pulsar/metadata/impl/MetadataStoreFactoryImplTest.java | 2 +-
10 files changed, 42 insertions(+), 14 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index f0ec8f52375..8383be4d5b3 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -77,6 +77,20 @@ public interface MetadataStore extends AutoCloseable {
*/
CompletableFuture<List<String>> getChildren(String path);
+
+ /**
+ * Return all the nodes (lexicographically sorted) that are children to
the specific path.
+ *
+ * If the path itself does not exist, it will return an empty list.
+ *
+ * This method is similar to {@link #getChildren(String)}, but it attempts
to read directly from
+ * the underlying store.
+ *
+ * @param path
+ * the path of the key to get from the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<List<String>> getChildrenFromStore(String path);
/**
* Read whether a specific path exists.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
index 4db7f4798c3..bd1c09ddf0f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java
@@ -68,7 +68,8 @@ abstract class AbstractHierarchicalLedgerManager {
final AsyncCallback.VoidCallback finalCb, final Object context,
final int successRc, final int failureRc) {
- store.getChildren(path)
+ store.sync(path)
+ .thenCompose(__ -> store.getChildrenFromStore(path))
.thenAccept(levelNodes -> {
if (levelNodes.isEmpty()) {
finalCb.processResult(successRc, null, context);
@@ -162,7 +163,7 @@ abstract class AbstractHierarchicalLedgerManager {
* Process ledgers in a single zk node.
*
* <p>
- * for each ledger found in this zk node, processor#process(ledgerId) will
be triggerred
+ * for each ledger found in this zk node, processor#process(ledgerId) will
be triggered
* to process a specific ledger. after all ledgers has been processed, the
finalCb will
* be called with provided context object. The RC passed to finalCb is
decided by :
* <ul>
@@ -188,7 +189,8 @@ abstract class AbstractHierarchicalLedgerManager {
final String path, final
BookkeeperInternalCallbacks.Processor<Long> processor,
final AsyncCallback.VoidCallback finalCb, final Object ctx,
final int successRc, final int failureRc) {
- store.getChildren(path)
+ store.sync(path)
+ .thenCompose(__ -> store.getChildrenFromStore(path))
.thenAccept(ledgerNodes -> {
Set<Long> activeLedgers =
HierarchicalLedgerUtils.ledgerListToSet(ledgerNodes,
ledgerRootPath, path);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
index 37e6dc836f2..cd9533843cc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java
@@ -69,7 +69,7 @@ public class LegacyHierarchicalLedgerRangeIterator implements
LedgerManager.Ledg
* Iterate next level1 znode.
*
* @return false if have visited all level1 nodes
- * @throws InterruptedException/KeeperException if error occurs reading
zookeeper children
+ * @throws InterruptedException/ExecutionException/TimeoutException if
error occurs reading zookeeper children
*/
private boolean nextL1Node() throws ExecutionException,
InterruptedException, TimeoutException {
l2NodesIter = null;
@@ -83,7 +83,9 @@ public class LegacyHierarchicalLedgerRangeIterator implements
LedgerManager.Ledg
if (!isLedgerParentNode(curL1Nodes)) {
continue;
}
- List<String> l2Nodes = store.getChildren(ledgersRoot + "/" +
curL1Nodes)
+ String path = ledgersRoot + "/" + curL1Nodes;
+ List<String> l2Nodes = store.sync(path)
+ .thenCompose(__ -> store.getChildrenFromStore(path))
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l2NodesIter = l2Nodes.iterator();
if (!l2NodesIter.hasNext()) {
@@ -99,7 +101,8 @@ public class LegacyHierarchicalLedgerRangeIterator
implements LedgerManager.Ledg
boolean hasMoreElements = false;
try {
if (l1NodesIter == null) {
- List<String> l1Nodes = store.getChildren(ledgersRoot)
+ List<String> l1Nodes = store.sync(ledgersRoot)
+ .thenCompose(__ ->
store.getChildrenFromStore(ledgersRoot))
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
l1NodesIter = l1Nodes.iterator();
hasMoreElements = nextL1Node();
@@ -162,7 +165,8 @@ public class LegacyHierarchicalLedgerRangeIterator
implements LedgerManager.Ledg
String nodePath = nodeBuilder.toString();
List<String> ledgerNodes = null;
try {
- ledgerNodes =
store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ ledgerNodes = store.sync(nodePath).thenCompose(__ ->
store.getChildrenFromStore(nodePath))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new IOException("Error when get child nodes from zk", e);
} catch (InterruptedException e) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
index 3b32916e6e7..92d3dcbe8f2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java
@@ -59,7 +59,7 @@ class LongHierarchicalLedgerRangeIterator implements
LedgerManager.LedgerRangeIt
*/
List<String> getChildrenAt(String path) throws IOException {
try {
- return store.getChildren(path)
+ return store.sync(path).thenCompose(__ ->
store.getChildrenFromStore(path))
.get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT,
TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 33ef44d7599..506df8b631d 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -87,8 +87,6 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- protected abstract CompletableFuture<List<String>>
getChildrenFromStore(String path);
-
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 91cd3754d69..de0191c7594 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -94,6 +94,16 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
return store.getChildren(path);
}
+ @Override
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ Optional<MetadataStoreException> ex =
programmedFailure(OperationType.GET_CHILDREN, path);
+ if (ex.isPresent()) {
+ return FutureUtil.failedFuture(ex.get());
+ }
+
+ return store.getChildrenFromStore(path);
+ }
+
@Override
public CompletableFuture<Boolean> exists(String path) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.EXISTS, path);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 20e3c4c2b27..752fc7153cf 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -404,7 +404,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<List<String>> getChildrenFromStore(String
path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (log.isDebugEnabled()) {
log.debug("getChildrenFromStore.path={},instanceId={}", path,
instanceId);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 4275920d7f9..865213643c3 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -144,7 +144,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
@Override
- protected final CompletableFuture<List<String>>
getChildrenFromStore(String path) {
+ public final CompletableFuture<List<String>> getChildrenFromStore(String
path) {
OpGetChildren op = new OpGetChildren(path);
enqueue(readOps, op);
return op.getFuture();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index e0415f0ef52..c7e1ce24b08 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -147,7 +147,7 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<List<String>> getChildrenFromStore(String
path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
var pathWithSlash = path + "/";
return client
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index ec902d51bb0..b9abaece9e4 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -100,7 +100,7 @@ public class MetadataStoreFactoryImplTest {
}
@Override
- protected CompletableFuture<List<String>> getChildrenFromStore(String
path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String
path) {
return null;
}