Copilot commented on code in PR #24698:
URL: https://github.com/apache/pulsar/pull/24698#discussion_r2321870043
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -385,16 +386,12 @@ public CompletableFuture<Void> closeAsync() {
timeout.cancel();
recheckPatternTimeout = null;
}
- List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
- if (watcherFuture.isDone() &&
!watcherFuture.isCompletedExceptionally()) {
- TopicListWatcher watcher = watcherFuture.getNow(null);
- // watcher can be null when subscription mode is not persistent
- if (watcher != null) {
- closeFutures.add(watcher.closeAsync());
- }
- }
-
closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__
-> super.closeAsync()));
- return FutureUtil.waitForAll(closeFutures);
+ CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
+ topicListWatcher ->
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
+ CompletableFuture<Void> runningTaskCancelFuture =
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
+ return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture,
runningTaskCancelFuture))
+ .exceptionally(t -> null).thenCompose(__ ->
super.closeAsync());
Review Comment:
The `exceptionally(t -> null)` handler suppresses all exceptions from
watcher closure and task cancellation, which could mask important cleanup
failures. Consider logging the exception or allowing certain critical
exceptions to propagate.
```suggestion
.orElse(CompletableFuture.completedFuture(null)))
.exceptionally(t -> {
log.error("Exception occurred while closing
TopicListWatcher", t);
return null;
});
CompletableFuture<Void> runningTaskCancelFuture =
updateTaskQueue.cancelAllAndWaitForTheRunningTask();
return FutureUtil.waitForAll(Lists.newArrayList(watcherCloseFuture,
runningTaskCancelFuture))
.exceptionally(t -> {
log.error("Exception occurred while cancelling running
update tasks or closing watcher", t);
return null;
})
.thenCompose(__ -> super.closeAsync());
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -385,16 +386,12 @@ public CompletableFuture<Void> closeAsync() {
timeout.cancel();
recheckPatternTimeout = null;
}
- List<CompletableFuture<?>> closeFutures = new ArrayList<>(2);
- if (watcherFuture.isDone() &&
!watcherFuture.isCompletedExceptionally()) {
- TopicListWatcher watcher = watcherFuture.getNow(null);
- // watcher can be null when subscription mode is not persistent
- if (watcher != null) {
- closeFutures.add(watcher.closeAsync());
- }
- }
-
closeFutures.add(updateTaskQueue.cancelAllAndWaitForTheRunningTask().thenCompose(__
-> super.closeAsync()));
- return FutureUtil.waitForAll(closeFutures);
+ CompletableFuture<Void> watcherCloseFuture = watcherFuture.thenCompose(
+ topicListWatcher ->
Optional.ofNullable(topicListWatcher).map(TopicListWatcher::closeAsync)
+
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> null);
Review Comment:
The `exceptionally(t -> null)` handler silently swallows all exceptions from
watcher creation failures. This could hide important errors during the close
operation that might indicate underlying issues.
```suggestion
.orElse(CompletableFuture.completedFuture(null))).exceptionally(t -> {
log.error("Failed to close TopicListWatcher", t); return null; });
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]