This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cc824e504ba [fix][client]TopicListWatcher not closed when calling 
PatternMultiTopicsConsumerImpl.closeAsync() method (#24698)
cc824e504ba is described below

commit cc824e504ba2fab6c00d76c327379910ecff8e69
Author: Oneby <[email protected]>
AuthorDate: Fri Sep 5 16:38:30 2025 +0800

    [fix][client]TopicListWatcher not closed when calling 
PatternMultiTopicsConsumerImpl.closeAsync() method (#24698)
---
 .../client/impl/PatternMultiTopicsConsumerImpl.java     | 17 +++++++----------
 .../org/apache/pulsar/client/impl/TopicListWatcher.java | 13 ++++++++++---
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index a31b857d251..ee949593c19 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -385,16 +386,12 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
             timeout.cancel();
             recheckPatternTimeout = null;
         }
-        List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
-        if (watcherFuture.isDone() && 
!watcherFuture.isCompletedExceptionally()) {
-            TopicListWatcher watcher = watcherFuture.getNow(null);
-            // watcher can be null when subscription mode is not persistent
-            if (watcher != null) {
-                closeFutures.add(watcher.closeAsync());
-            }
-        }
-        
closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__
 -> super.closeAsync()));
-        return FutureUtil.waitForAll(closeFutures);
+        CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
+                topicListWatcher -> 
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+                        
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
+        CompletableFuture<Void> runningTaskCancelFuture = 
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
+        return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture, 
runningTaskCancelFuture))
+                .exceptionally(t -> null).thenCompose(__ -> 
super.closeAsync());
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 5ea1e22cc30..d83ae3e7337 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -47,6 +47,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
     private final ConnectionHandler connectionHandler;
     private final TopicsPattern topicsPattern;
     private final long watcherId;
+    private final long lookupDeadline;
     private volatile long createWatcherDeadline = 0;
     private final NamespaceName namespace;
     // TODO maintain the value based on updates from broker and warn the user 
if inconsistent with hash from polling
@@ -80,6 +81,7 @@ public class TopicListWatcher extends HandlerState implements 
ConnectionHandler.
                 this);
         this.topicsPattern = topicsPattern;
         this.watcherId = watcherId;
+        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
         this.namespace = namespace;
         this.topicsHash = topicsHash;
         this.watcherFuture = watcherFuture;
@@ -91,12 +93,17 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
     @Override
     public boolean connectionFailed(PulsarClientException exception) {
         boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
-        if (nonRetriableError) {
+        boolean timeout = System.currentTimeMillis() > lookupDeadline;
+        if (nonRetriableError || timeout) {
             exception.setPreviousExceptionCount(previousExceptionCount);
             if (watcherFuture.completeExceptionally(exception)) {
                 setState(State.Failed);
-                log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}",
-                        topic, name, exception.getMessage());
+                if (nonRetriableError) {
+                    log.info("[{}] Watcher creation failed for {} with 
non-retriable error {}", topic, name,
+                            exception.getMessage());
+                } else {
+                    log.info("[{}] Watcher creation failed for {} after 
timeout", topic, name);
+                }
                 deregisterFromClientCnx();
                 return false;
             }

Reply via email to