This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0cda4f400b0 [improve][broker] Optimize Reader creation in
TopicPoliciesService (#24658)
0cda4f400b0 is described below
commit 0cda4f400b0cb785447fc1a64edb4f97c17c309a
Author: 道君- Tao Jiuming <[email protected]>
AuthorDate: Tue Sep 9 11:31:12 2025 +0800
[improve][broker] Optimize Reader creation in TopicPoliciesService (#24658)
Co-authored-by: Zixuan Liu <[email protected]>
---
.../SystemTopicBasedTopicPoliciesService.java | 93 ++++++++++++++++++----
.../pulsar/broker/admin/TopicPoliciesTest.java | 6 ++
2 files changed, 84 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 880498209a6..7e3590f1bb8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -579,8 +579,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
return
policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- createSystemTopicClient(namespace);
- readerCaches.put(namespace,
readerCompletableFuture);
+ newReader(namespace);
final CompletableFuture<Void> initFuture =
readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void>
stageFuture = new CompletableFuture<>();
@@ -594,9 +593,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (closed.get()) {
return null;
}
- log.error("[{}] Failed to create
reader on __change_events topic",
- namespace, ex);
- cleanCacheAndCloseReader(namespace,
false);
+ cleanPoliciesCacheInitMap(
+ namespace,
readerCompletableFuture.isCompletedExceptionally());
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break
callback chain
log.error("[{}] Failed to cleanup
reader on __change_events topic",
@@ -610,6 +608,20 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+ private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
newReader(NamespaceName ns) {
+ return readerCaches.compute(ns, (__, existingFuture) -> {
+ if (existingFuture == null) {
+ return createSystemTopicClient(ns);
+ }
+
+ if (existingFuture.isDone() &&
existingFuture.isCompletedExceptionally()) {
+ return existingFuture.exceptionallyCompose(ex ->
+ isAlreadyClosedException(ex) ? existingFuture :
createSystemTopicClient(ns));
+ }
+ return existingFuture;
+ });
+ }
+
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClient(
NamespaceName namespace) {
if (closed.get()) {
@@ -633,7 +645,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
AtomicInteger bundlesCount =
ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
- cleanCacheAndCloseReader(namespace, true, true);
+ cleanPoliciesCacheInitMap(namespace, true);
+ cleanWriterCache(namespace);
+ cleanOwnedBundlesCount(namespace);
}
}
@@ -665,7 +679,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent>
reader, CompletableFuture<Void> future) {
if (closed.get()) {
future.completeExceptionally(new
BrokerServiceException(getClass().getName() + " is closed."));
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
true);
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
@@ -673,7 +687,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to check the move events for the system
topic",
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
+ isAlreadyClosedException(ex));
return;
}
if (hasMore) {
@@ -692,7 +707,8 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
log.error("[{}] Failed to read event from the system
topic.",
reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(),
false);
+
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
+ isAlreadyClosedException(ex));
return null;
});
} else {
@@ -718,10 +734,45 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
- private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace,
boolean cleanOwnedBundlesCount) {
- cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
+
+ private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace,
boolean closeReader) {
+ if (!closeReader) {
+ policyCacheInitMap.remove(namespace);
+ return;
+ }
+
+ TopicPolicyMessageHandlerTracker topicPolicyMessageHandlerTracker =
+ topicPolicyMessageHandlerTrackers.remove(namespace);
+ if (topicPolicyMessageHandlerTracker != null) {
+ topicPolicyMessageHandlerTracker.close();
+ }
+
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture
= readerCaches.remove(namespace);
+ policyCacheInitMap.compute(namespace, (k, v) -> {
+ policiesCache.entrySet().removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ globalPoliciesCache.entrySet()
+ .removeIf(entry ->
Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+ return null;
+ });
+ if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+ readerFuture
+ .thenCompose(SystemTopicClient.Reader::closeAsync)
+ .exceptionally(ex -> {
+ log.warn("[{}] Close change_event reader fail.",
namespace, ex);
+ return null;
+ });
+ }
+ }
+
+ private void cleanWriterCache(@NonNull NamespaceName namespace) {
+ writerCaches.synchronous().invalidate(namespace);
}
+ private void cleanOwnedBundlesCount(@NonNull NamespaceName namespace) {
+ ownedBundlesCountPerNamespace.remove(namespace);
+ }
+
+
private void cleanCacheAndCloseReader(@NonNull NamespaceName namespace,
boolean cleanOwnedBundlesCount,
boolean cleanWriterCache) {
if (cleanWriterCache) {
@@ -754,6 +805,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+
+
+
/**
* This is an async method for the background reader to continue syncing
new messages.
*
@@ -763,7 +817,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent>
reader) {
NamespaceName namespaceObject =
reader.getSystemTopic().getTopicName().getNamespaceObject();
if (closed.get()) {
- cleanCacheAndCloseReader(namespaceObject, false);
+ cleanPoliciesCacheInitMap(namespaceObject, true);
return;
}
reader.readNextAsync()
@@ -784,11 +838,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
if (ex == null) {
readMorePoliciesAsync(reader);
} else {
- Throwable cause =
FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
PulsarClientException.AlreadyClosedException) {
+ if (isAlreadyClosedException(ex)) {
log.info("Closing the topic policies reader for
{}",
reader.getSystemTopic().getTopicName());
- cleanCacheAndCloseReader(namespaceObject, false);
+ cleanPoliciesCacheInitMap(namespaceObject, true);
} else {
log.warn("Read more topic polices exception, read
again.", ex);
readMorePoliciesAsync(reader);
@@ -797,6 +850,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
});
}
+ private boolean isAlreadyClosedException(Throwable ex) {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ return cause instanceof PulsarClientException.AlreadyClosedException;
+ }
+
private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
@@ -884,6 +942,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
+ @VisibleForTesting
+ public Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> getReaderCaches() {
+ return readerCaches;
+ }
+
@VisibleForTesting
long getPoliciesCacheSize() {
return policiesCache.size();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index a41ee20137b..0f854392748 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -65,6 +65,7 @@ import
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -79,6 +80,7 @@ import
org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -3580,6 +3582,10 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
+
+ Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches =
+ ((SystemTopicBasedTopicPoliciesService)
topicPoliciesService).getReaderCaches();
+ readerCaches.clear();
}
@DataProvider(name = "reloadPolicyTypes")