Copilot commented on code in PR #25295:
URL: https://github.com/apache/pulsar/pull/25295#discussion_r2904199585


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
  */
 public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesService {
 
+    private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = 
Counter.build(
+            "pulsar_topic_policies_cache_init_failures_total",
+            "Total number of topic policies cache initialization failures 
after all retries exhausted")
+            .labelNames("namespace")

Review Comment:
   Labeling these counters by full `namespace` can create unbounded metric 
cardinality (potentially one time series per namespace per broker), which is a 
common source of Prometheus memory/CPU pressure. Consider reducing cardinality 
(e.g., remove the label, use tenant-only, or gate per-namespace labeling behind 
a config) while still meeting the “emit a metric on failure” requirement.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -619,6 +626,123 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle 
namespaceBundle) {
                 });
     }
 
+    /**
+     * Initializes the topic policies cache with timeout and retry support.
+     * On each attempt, a new reader is created, and {@link #initPolicesCache} 
is called with a timeout.
+     * If the initialization times out, the reader is closed and a new attempt 
is made.
+     * After all retries are exhausted, namespace bundles are unloaded from 
this broker so they can be
+     * reassigned to a different broker.
+     *
+     * @param namespace the namespace to initialize policies for
+     * @param retriesLeft number of retries remaining
+     * @return a future that completes when initialization succeeds or fails 
after all retries
+     */
+    private CompletableFuture<Void> 
initPoliciesCacheWithTimeoutAndRetry(NamespaceName namespace, int retriesLeft) {
+        if (closed.get()) {
+            return CompletableFuture.failedFuture(
+                    new BrokerServiceException(getClass().getName() + " is 
closed."));
+        }
+
+        long timeoutSeconds = 
pulsarService.getConfiguration().getTopicPoliciesCacheInitTimeoutSeconds();
+        final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerFuture = newReader(namespace);
+
+        CompletableFuture<Void> attempt = readerFuture.thenCompose(reader -> {
+            final CompletableFuture<Void> stageFuture = new 
CompletableFuture<>();
+            initPolicesCache(reader, stageFuture);
+
+            CompletableFuture<Void> timedFuture = timeoutSeconds > 0
+                    ? stageFuture.orTimeout(timeoutSeconds, TimeUnit.SECONDS)
+                    : stageFuture;
+
+            return timedFuture.thenAccept(__ -> readMorePoliciesAsync(reader));
+        });
+
+        return attempt
+                .thenApply(v -> CompletableFuture.completedFuture(v))
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                    if (cause instanceof TimeoutException) {
+                        
TOPIC_POLICIES_CACHE_INIT_TIMEOUTS.labels(namespace.toString()).inc();
+                        // Close the stuck reader and remove from cache so a 
new one can be created
+                        closeAndRemoveReaderForNamespace(namespace);
+
+                        if (retriesLeft > 0) {
+                            log.warn("[{}] Topic policies cache initialization 
timed out after {}s. "
+                                            + "Retrying... ({} retries left)",
+                                    namespace, timeoutSeconds, retriesLeft);
+                            return 
initPoliciesCacheWithTimeoutAndRetry(namespace, retriesLeft - 1);
+                        } else {
+                            log.error("[{}] Topic policies cache 
initialization failed after all retries "
+                                            + "(timed out after {}s per 
attempt). Unloading namespace bundles "
+                                            + "from this broker.",
+                                    namespace, timeoutSeconds);
+                            
TOPIC_POLICIES_CACHE_INIT_FAILURES.labels(namespace.toString()).inc();
+                            unloadNamespaceBundlesAsync(namespace);
+                            return CompletableFuture.<Void>failedFuture(
+                                    new BrokerServiceException(
+                                            "Topic policies cache 
initialization failed after all retries "
+                                                    + "for namespace " + 
namespace));
+                        }
+                    }
+                    // For non-timeout exceptions (e.g. reader creation 
failure), propagate directly
+                    return CompletableFuture.<Void>failedFuture(cause);
+                })
+                .thenCompose(Function.identity());

Review Comment:
   The `thenApply(v -> completedFuture(v))` + `exceptionally(...)` + 
`thenCompose(identity())` pattern is harder to read and maintain than 
necessary. Consider rewriting using a single `handle`/`whenComplete`-style 
branch that returns either a value or a next-stage future (and then 
`thenCompose` once), or using `exceptionallyCompose` if the project’s Java 
target supports it—this will avoid nested futures and make the retry/failure 
flow clearer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
  */
 public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesService {
 
+    private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = 
Counter.build(
+            "pulsar_topic_policies_cache_init_failures_total",
+            "Total number of topic policies cache initialization failures 
after all retries exhausted")
+            .labelNames("namespace")
+            .register();
+
+    private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = 
Counter.build(
+            "pulsar_topic_policies_cache_init_timeouts_total",
+            "Total number of topic policies cache initialization timeouts 
(including retried attempts)")
+            .labelNames("namespace")
+            .register();

Review Comment:
   Static `Counter.build(...).register()` registers into the global default 
Prometheus registry at class-load time. In long-running brokers and especially 
in unit/integration tests that may load/reload components in the same JVM, this 
pattern can trigger duplicate-collector registration errors and makes metric 
lifecycle hard to control. Prefer wiring metrics through Pulsar’s existing 
metrics/registry facilities (or a registry owned by the broker instance) rather 
than static global registration in the service class.
   ```suggestion
               .create();
   
       private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = 
Counter.build(
               "pulsar_topic_policies_cache_init_timeouts_total",
               "Total number of topic policies cache initialization timeouts 
(including retried attempts)")
               .labelNames("namespace")
               .create();
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -581,29 +599,18 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle 
namespaceBundle) {
                     CompletableFuture<Void> existingFuture =
                             policyCacheInitMap.putIfAbsent(namespace, 
initNamespacePolicyFuture);
                     if (existingFuture == null) {
-                        final 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                                newReader(namespace);
-                        readerCompletableFuture
-                                .thenCompose(reader -> {
-                                    final CompletableFuture<Void> stageFuture 
= new CompletableFuture<>();
-                                    initPolicesCache(reader, stageFuture);
-                                    return stageFuture
-                                            // Read policies in background
-                                            .thenAccept(__ -> 
readMorePoliciesAsync(reader));
-                                }).thenApply(__ -> {
+                        int maxRetries = pulsarService.getConfiguration()
+                                .getTopicPoliciesCacheInitMaxRetries();
+                        initPoliciesCacheWithTimeoutAndRetry(namespace, 
maxRetries)
+                                .thenApply(__ -> {
                                     initNamespacePolicyFuture.complete(null);
                                     return null;
                                 }).exceptionally(ex -> {
                                     try {
-                                        if 
(readerCompletableFuture.isCompletedExceptionally()) {
-                                            log.error("[{}] Failed to create 
reader on __change_events topic",
-                                                    namespace, ex);
-                                            
initNamespacePolicyFuture.completeExceptionally(ex);
-                                            
cleanPoliciesCacheInitMap(namespace, true);
-                                        } else {
-                                            
initNamespacePolicyFuture.completeExceptionally(ex);
-                                            
cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex));
-                                        }
+                                        log.error("[{}] Failed to initialize 
topic policies cache",
+                                                namespace, ex);
+                                        
initNamespacePolicyFuture.completeExceptionally(ex);
+                                        cleanPoliciesCacheInitMap(namespace, 
true);

Review Comment:
   This changes cleanup behavior from the prior logic that conditioned the 
boolean argument on the exception type (e.g., `isAlreadyClosedException(ex)`), 
and also previously treated reader-creation failures differently. Always 
passing `true` can alter shutdown/cleanup semantics and may cause incorrect 
cleanup decisions for non-close-related failures. Suggest restoring the prior 
decision logic (e.g., pass `isAlreadyClosedException(ex)` where appropriate, 
and keep the more specific branching for reader creation vs. later-stage 
failures if it impacts cleanup).
   ```suggestion
                                           cleanPoliciesCacheInitMap(namespace,
                                                   ex instanceof 
PulsarClientException.AlreadyClosedException
                                                           || ex.getCause() 
instanceof PulsarClientException.AlreadyClosedException);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
         // make sure not do cleanPoliciesCacheInitMap() twice
         // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanPoliciesCacheInitMap() is 1.
         boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
-                logEvent.getMessage().toString().contains("Failed to create 
reader on __change_events topic"));
+                logEvent.getMessage().toString().contains("Failed to 
initialize topic policies cache"));
         assertTrue(logFound);
         boolean logFound2 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
                 logEvent.getMessage().toString().contains("Failed to check the 
move events for the system topic")
                         || logEvent.getMessage().toString().contains("Failed 
to read event from the system topic"));
         assertFalse(logFound2);
         verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
     }
+
+    @Test
+    public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws 
Exception {
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+        pulsar.getTopicPoliciesService().close();
+        // Set a very short timeout and allow 2 retries
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+        conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        // Create a reader that never completes hasMoreEventsAsync (simulates 
a stuck reader)
+        SystemTopicClient.Reader<PulsarEvent> mockReader = 
Mockito.mock(SystemTopicClient.Reader.class);
+        SystemTopicClient<PulsarEvent> mockSystemTopic = 
Mockito.mock(SystemTopicClient.class);
+        TopicName changeEventsTopic = TopicName.get("persistent://" + 
NAMESPACE5 + "/__change_events");
+        
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+        Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+        // First call: never complete (will timeout). Second call: return 
false (no more events)
+        CompletableFuture<Boolean> neverCompleteFuture = new 
CompletableFuture<>();
+        Mockito.when(mockReader.hasMoreEventsAsync())
+                .thenReturn(neverCompleteFuture)
+                .thenReturn(CompletableFuture.completedFuture(false));
+        
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        // Put the mock reader in reader cache
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
CompletableFuture.completedFuture(mockReader));
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // On retry (after the stuck reader is removed), create a real reader
+        Mockito.doAnswer(invocation -> {
+            NamespaceName ns = invocation.getArgument(0);
+            // Return a real reader for the retry
+            return spyReaderCaches.compute(ns, (k, v) -> {
+                if (v == null) {
+                    return CompletableFuture.completedFuture(mockReader);
+                }
+                return v;
+            });
+        
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+        CompletableFuture<Boolean> prepareFuture =
+                
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+        // The first attempt times out, the second attempt should succeed 
(since hasMoreEventsAsync
+        // returns false on second call)
+        try {
+            prepareFuture.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Retry may or may not succeed depending on mock setup; the 
important thing is
+            // the timeout was detected
+        }
+

Review Comment:
   This test swallows all exceptions and explicitly allows the retry to “may or 
may not succeed”, which makes the test non-deterministic and not aligned with 
its name (“SuccessfulRetry”). It should assert a clear expected outcome (e.g., 
`prepareFuture` completes successfully and `hasMoreEventsAsync()` is invoked 
twice, or at minimum that a retry occurred via verifiable interactions), 
otherwise the test can pass even if the retry logic is broken.
   ```suggestion
           // The first attempt should time out, and the second attempt should 
succeed (since
           // hasMoreEventsAsync returns false on the second call)
           Boolean initResult = prepareFuture.get(30, TimeUnit.SECONDS);
           assertTrue(initResult);
           // Verify that hasMoreEventsAsync was invoked twice (initial attempt 
+ retry)
           verify(mockReader, times(2)).hasMoreEventsAsync();
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
         // make sure not do cleanPoliciesCacheInitMap() twice
         // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanPoliciesCacheInitMap() is 1.
         boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
-                logEvent.getMessage().toString().contains("Failed to create 
reader on __change_events topic"));
+                logEvent.getMessage().toString().contains("Failed to 
initialize topic policies cache"));
         assertTrue(logFound);
         boolean logFound2 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
                 logEvent.getMessage().toString().contains("Failed to check the 
move events for the system topic")
                         || logEvent.getMessage().toString().contains("Failed 
to read event from the system topic"));
         assertFalse(logFound2);
         verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
     }
+
+    @Test
+    public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws 
Exception {
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+        pulsar.getTopicPoliciesService().close();
+        // Set a very short timeout and allow 2 retries
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+        conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        // Create a reader that never completes hasMoreEventsAsync (simulates 
a stuck reader)
+        SystemTopicClient.Reader<PulsarEvent> mockReader = 
Mockito.mock(SystemTopicClient.Reader.class);
+        SystemTopicClient<PulsarEvent> mockSystemTopic = 
Mockito.mock(SystemTopicClient.class);
+        TopicName changeEventsTopic = TopicName.get("persistent://" + 
NAMESPACE5 + "/__change_events");
+        
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+        Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+        // First call: never complete (will timeout). Second call: return 
false (no more events)
+        CompletableFuture<Boolean> neverCompleteFuture = new 
CompletableFuture<>();
+        Mockito.when(mockReader.hasMoreEventsAsync())
+                .thenReturn(neverCompleteFuture)
+                .thenReturn(CompletableFuture.completedFuture(false));
+        
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        // Put the mock reader in reader cache
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
CompletableFuture.completedFuture(mockReader));
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // On retry (after the stuck reader is removed), create a real reader
+        Mockito.doAnswer(invocation -> {
+            NamespaceName ns = invocation.getArgument(0);
+            // Return a real reader for the retry
+            return spyReaderCaches.compute(ns, (k, v) -> {
+                if (v == null) {
+                    return CompletableFuture.completedFuture(mockReader);
+                }
+                return v;
+            });
+        
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+        CompletableFuture<Boolean> prepareFuture =
+                
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+        // The first attempt times out, the second attempt should succeed 
(since hasMoreEventsAsync
+        // returns false on second call)
+        try {
+            prepareFuture.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Retry may or may not succeed depending on mock setup; the 
important thing is
+            // the timeout was detected
+        }
+
+        // Verify that the timeout was detected and retry was attempted
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            boolean timeoutLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                    logEvent.getMessage().toString().contains(
+                            "Topic policies cache initialization timed out"));
+            assertTrue(timeoutLogFound);
+        });
+
+        // Reset config
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
+        conf.setTopicPoliciesCacheInitMaxRetries(3);
+    }
+
+    @Test
+    public void testInitPoliciesCacheTimeoutExhaustsRetries() throws Exception 
{
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+        pulsar.getTopicPoliciesService().close();
+        // Set a very short timeout and 0 retries so it fails immediately 
after first timeout
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+        conf.setTopicPoliciesCacheInitMaxRetries(0);
+
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+
+        // Create a reader that never completes hasMoreEventsAsync (simulates 
a stuck reader)
+        SystemTopicClient.Reader<PulsarEvent> mockReader = 
Mockito.mock(SystemTopicClient.Reader.class);
+        SystemTopicClient<PulsarEvent> mockSystemTopic = 
Mockito.mock(SystemTopicClient.class);
+        TopicName changeEventsTopic = TopicName.get("persistent://" + 
NAMESPACE5 + "/__change_events");
+        
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+        Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+        Mockito.when(mockReader.hasMoreEventsAsync()).thenReturn(new 
CompletableFuture<>());
+        
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
CompletableFuture.completedFuture(mockReader));
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        CompletableFuture<Boolean> prepareFuture =
+                
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+        try {
+            prepareFuture.get(30, TimeUnit.SECONDS);
+            Assert.fail("Should have failed after retries exhausted");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause().getMessage().contains(
+                    "Topic policies cache initialization failed after all 
retries"));
+        }
+
+        // Verify the failure log was emitted
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            boolean failureLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                    logEvent.getMessage().toString().contains(
+                            "Topic policies cache initialization failed after 
all retries"));
+            assertTrue(failureLogFound);
+        });
+
+        // Verify that the unloading log was emitted (may be "No owned 
bundles" or "Unloading")
+        boolean unloadLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent -> {
+            String msg = logEvent.getMessage().toString();
+            return msg.contains("Unloading") && msg.contains("namespace 
bundles")
+                    || msg.contains("No owned bundles found to unload");
+        });
+        assertTrue(unloadLogFound);

Review Comment:
   This assertion is potentially racy because unloading is initiated 
asynchronously; unlike the failure-log assertion above, it doesn’t wait for the 
unload log to appear. Use `Awaitility` (or otherwise synchronize) around this 
check to make the test deterministic.
   ```suggestion
           Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
               boolean unloadLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent -> {
                   String msg = logEvent.getMessage().toString();
                   return (msg.contains("Unloading") && msg.contains("namespace 
bundles"))
                           || msg.contains("No owned bundles found to unload");
               });
               assertTrue(unloadLogFound);
           });
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -78,6 +84,18 @@
  */
 public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesService {
 
+    private static final Counter TOPIC_POLICIES_CACHE_INIT_FAILURES = 
Counter.build(
+            "pulsar_topic_policies_cache_init_failures_total",
+            "Total number of topic policies cache initialization failures 
after all retries exhausted")
+            .labelNames("namespace")
+            .register();
+
+    private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = 
Counter.build(
+            "pulsar_topic_policies_cache_init_timeouts_total",
+            "Total number of topic policies cache initialization timeouts 
(including retried attempts)")
+            .labelNames("namespace")

Review Comment:
   Labeling these counters by full `namespace` can create unbounded metric 
cardinality (potentially one time series per namespace per broker), which is a 
common source of Prometheus memory/CPU pressure. Consider reducing cardinality 
(e.g., remove the label, use tenant-only, or gate per-namespace labeling behind 
a config) while still meeting the “emit a metric on failure” requirement.
   ```suggestion
               "Total number of topic policies cache initialization failures 
after all retries exhausted, per tenant")
               .labelNames("tenant")
               .register();
   
       private static final Counter TOPIC_POLICIES_CACHE_INIT_TIMEOUTS = 
Counter.build(
               "pulsar_topic_policies_cache_init_timeouts_total",
               "Total number of topic policies cache initialization timeouts 
(including retried attempts), per tenant")
               .labelNames("tenant")
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -623,12 +624,148 @@ public void 
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() thro
         // make sure not do cleanPoliciesCacheInitMap() twice
         // totally trigger prepareInitPoliciesCacheAsync() once, so the time 
of cleanPoliciesCacheInitMap() is 1.
         boolean logFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
-                logEvent.getMessage().toString().contains("Failed to create 
reader on __change_events topic"));
+                logEvent.getMessage().toString().contains("Failed to 
initialize topic policies cache"));
         assertTrue(logFound);
         boolean logFound2 = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
                 logEvent.getMessage().toString().contains("Failed to check the 
move events for the system topic")
                         || logEvent.getMessage().toString().contains("Failed 
to read event from the system topic"));
         assertFalse(logFound2);
         verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), 
anyBoolean());
     }
+
+    @Test
+    public void testInitPoliciesCacheTimeoutWithSuccessfulRetry() throws 
Exception {
+        @Cleanup
+        TestLogAppender testLogAppender = TestLogAppender.create(log);
+
+        pulsar.getTopicPoliciesService().close();
+        // Set a very short timeout and allow 2 retries
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(1);
+        conf.setTopicPoliciesCacheInitMaxRetries(2);
+
+        SystemTopicBasedTopicPoliciesService spyService =
+                Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+        FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, 
true);
+
+        admin.namespaces().createNamespace(NAMESPACE5);
+        final String topic = "persistent://" + NAMESPACE5 + "/testTimeout" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+
+        // Create a reader that never completes hasMoreEventsAsync (simulates 
a stuck reader)
+        SystemTopicClient.Reader<PulsarEvent> mockReader = 
Mockito.mock(SystemTopicClient.Reader.class);
+        SystemTopicClient<PulsarEvent> mockSystemTopic = 
Mockito.mock(SystemTopicClient.class);
+        TopicName changeEventsTopic = TopicName.get("persistent://" + 
NAMESPACE5 + "/__change_events");
+        
Mockito.when(mockSystemTopic.getTopicName()).thenReturn(changeEventsTopic);
+        Mockito.when(mockReader.getSystemTopic()).thenReturn(mockSystemTopic);
+        // First call: never complete (will timeout). Second call: return 
false (no more events)
+        CompletableFuture<Boolean> neverCompleteFuture = new 
CompletableFuture<>();
+        Mockito.when(mockReader.hasMoreEventsAsync())
+                .thenReturn(neverCompleteFuture)
+                .thenReturn(CompletableFuture.completedFuture(false));
+        
Mockito.when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+        // Put the mock reader in reader cache
+        ConcurrentHashMap<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+                spyReaderCaches = new ConcurrentHashMap<>();
+        spyReaderCaches.put(NamespaceName.get(NAMESPACE5), 
CompletableFuture.completedFuture(mockReader));
+        FieldUtils.writeDeclaredField(spyService, "readerCaches", 
spyReaderCaches, true);
+
+        // On retry (after the stuck reader is removed), create a real reader
+        Mockito.doAnswer(invocation -> {
+            NamespaceName ns = invocation.getArgument(0);
+            // Return a real reader for the retry
+            return spyReaderCaches.compute(ns, (k, v) -> {
+                if (v == null) {
+                    return CompletableFuture.completedFuture(mockReader);
+                }
+                return v;
+            });
+        
}).when(spyService).createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+
+        CompletableFuture<Boolean> prepareFuture =
+                
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+
+        // The first attempt times out, the second attempt should succeed 
(since hasMoreEventsAsync
+        // returns false on second call)
+        try {
+            prepareFuture.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Retry may or may not succeed depending on mock setup; the 
important thing is
+            // the timeout was detected
+        }
+
+        // Verify that the timeout was detected and retry was attempted
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            boolean timeoutLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+                    logEvent.getMessage().toString().contains(
+                            "Topic policies cache initialization timed out"));
+            assertTrue(timeoutLogFound);
+        });
+
+        // Reset config
+        conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
+        conf.setTopicPoliciesCacheInitMaxRetries(3);

Review Comment:
   The config reset is not in a `finally` block. If the test fails before 
reaching these lines, it can leak altered broker config into subsequent tests 
and cause cascading failures. Consider wrapping the body in try/finally (or use 
a test framework hook) to guarantee restoration.
   ```suggestion
               try {
                   prepareFuture.get(30, TimeUnit.SECONDS);
               } catch (Exception e) {
                   // Retry may or may not succeed depending on mock setup; the 
important thing is
                   // the timeout was detected
               }
   
               // Verify that the timeout was detected and retry was attempted
               Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() 
-> {
                   boolean timeoutLogFound = 
testLogAppender.getEvents().stream().anyMatch(logEvent ->
                           logEvent.getMessage().toString().contains(
                                   "Topic policies cache initialization timed 
out"));
                   assertTrue(timeoutLogFound);
               });
           } finally {
               // Reset config
               conf.setTopicPoliciesCacheInitTimeoutSeconds(300);
               conf.setTopicPoliciesCacheInitMaxRetries(3);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to