This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cc824e504ba [fix][client]TopicListWatcher not closed when calling
PatternMultiTopicsConsumerImpl.closeAsync() method (#24698)
cc824e504ba is described below
commit cc824e504ba2fab6c00d76c327379910ecff8e69
Author: Oneby <[email protected]>
AuthorDate: Fri Sep 5 16:38:30 2025 +0800
[fix][client]TopicListWatcher not closed when calling
PatternMultiTopicsConsumerImpl.closeAsync() method (#24698)
---
.../client/impl/PatternMultiTopicsConsumerImpl.java | 17 +++++++----------
.../org/apache/pulsar/client/impl/TopicListWatcher.java | 13 ++++++++++---
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index a31b857d251..ee949593c19 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -385,16 +386,12 @@ public class PatternMultiTopicsConsumerImpl<T> extends
MultiTopicsConsumerImpl<T
timeout.cancel();
recheckPatternTimeout = null;
}
- List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
- if (watcherFuture.isDone() &&
!watcherFuture.isCompletedExceptionally()) {
- TopicListWatcher watcher = watcherFuture.getNow(null);
- // watcher can be null when subscription mode is not persistent
- if (watcher != null) {
- closeFutures.add(watcher.closeAsync());
- }
- }
-
closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__
-> super.closeAsync()));
- return FutureUtil.waitForAll(closeFutures);
+ CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
+ topicListWatcher ->
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
+ CompletableFuture<Void> runningTaskCancelFuture =
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
+ return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture,
runningTaskCancelFuture))
+ .exceptionally(t -> null).thenCompose(__ ->
super.closeAsync());
}
@VisibleForTesting
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 5ea1e22cc30..d83ae3e7337 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -47,6 +47,7 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
private final ConnectionHandler connectionHandler;
private final TopicsPattern topicsPattern;
private final long watcherId;
+ private final long lookupDeadline;
private volatile long createWatcherDeadline = 0;
private final NamespaceName namespace;
// TODO maintain the value based on updates from broker and warn the user
if inconsistent with hash from polling
@@ -80,6 +81,7 @@ public class TopicListWatcher extends HandlerState implements
ConnectionHandler.
this);
this.topicsPattern = topicsPattern;
this.watcherId = watcherId;
+ this.lookupDeadline = System.currentTimeMillis() +
client.getConfiguration().getLookupTimeoutMs();
this.namespace = namespace;
this.topicsHash = topicsHash;
this.watcherFuture = watcherFuture;
@@ -91,12 +93,17 @@ public class TopicListWatcher extends HandlerState
implements ConnectionHandler.
@Override
public boolean connectionFailed(PulsarClientException exception) {
boolean nonRetriableError =
!PulsarClientException.isRetriableError(exception);
- if (nonRetriableError) {
+ boolean timeout = System.currentTimeMillis() > lookupDeadline;
+ if (nonRetriableError || timeout) {
exception.setPreviousExceptionCount(previousExceptionCount);
if (watcherFuture.completeExceptionally(exception)) {
setState(State.Failed);
- log.info("[{}] Watcher creation failed for {} with
non-retriable error {}",
- topic, name, exception.getMessage());
+ if (nonRetriableError) {
+ log.info("[{}] Watcher creation failed for {} with
non-retriable error {}", topic, name,
+ exception.getMessage());
+ } else {
+ log.info("[{}] Watcher creation failed for {} after
timeout", topic, name);
+ }
deregisterFromClientCnx();
return false;
}