This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d3c615c494d [fix][broker] Fix race condition in
MetadataStoreCacheLoader causing inconsistent availableBroker list caching
(#24639)
d3c615c494d is described below
commit d3c615c494ddd3d2c0ba6005951e486d7b76e7f3
Author: zzb <[email protected]>
AuthorDate: Thu Aug 21 13:01:11 2025 +0800
[fix][broker] Fix race condition in MetadataStoreCacheLoader causing
inconsistent availableBroker list caching (#24639)
Co-authored-by: zhaizhibo <[email protected]>
---
.../broker/resources/MetadataStoreCacheLoader.java | 35 +++++++++++++---------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
index 43376f40550..29451148da4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
@@ -25,10 +25,12 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ public class MetadataStoreCacheLoader implements Closeable {
private final int operationTimeoutMs;
private volatile List<LoadManagerReport> availableBrokers;
+ private final FutureUtil.Sequencer<Void> sequencer;
private final OrderedScheduler orderedExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(8)
.name("pulsar-metadata-cache-loader-ordered-cache").build();
@@ -52,6 +55,7 @@ public class MetadataStoreCacheLoader implements Closeable {
public MetadataStoreCacheLoader(PulsarResources pulsarResources, int
operationTimeoutMs) throws Exception {
this.loadReportResources = pulsarResources.getLoadReportResources();
this.operationTimeoutMs = operationTimeoutMs;
+ this.sequencer = FutureUtil.Sequencer.create();
init();
}
@@ -61,26 +65,29 @@ public class MetadataStoreCacheLoader implements Closeable {
* @throws Exception
*/
public void init() throws Exception {
- loadReportResources.getStore().registerListener((n) -> {
- if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) &&
NotificationType.ChildrenChanged.equals(n.getType())) {
-
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{
- updateBrokerList(brokerNodes).thenRun(() -> {
- log.info("Successfully updated broker info {}",
brokerNodes);
- }).exceptionally(ex -> {
+ Supplier<CompletableFuture<Void>> tryUpdate = () -> {
+ return
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT)
+ .thenComposeAsync(brokerNodes -> {
+ return updateBrokerList(brokerNodes).thenRun(() -> {
+ log.info("Successfully updated broker info {}",
brokerNodes);
+ });
+ })
+ .exceptionally(ex -> {
log.warn("Error updating broker info after broker list
changed", ex);
return null;
});
- return null;
- }).exceptionally(ex -> {
- log.warn("Error updating broker info after broker list
changed", ex);
- return null;
- });
+ };
+ loadReportResources.getStore().registerListener((n) -> {
+ if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) &&
NotificationType.ChildrenChanged.equals(n.getType())) {
+ sequencer.sequential(tryUpdate);
}
});
-
+ if (loadReportResources.getStore() instanceof MetadataStoreExtended) {
+ ((MetadataStoreExtended)
loadReportResources.getStore()).registerSessionListener(sessionEvent ->
+ sequencer.sequential(tryUpdate));
+ }
// Do initial fetch of brokers list
-
updateBrokerList(loadReportResources.getChildren(LOADBALANCE_BROKERS_ROOT)).get(operationTimeoutMs,
- TimeUnit.SECONDS);
+ tryUpdate.get().get(operationTimeoutMs, TimeUnit.MILLISECONDS);
}
public List<LoadManagerReport> getAvailableBrokers() {