kirktrue commented on code in PR #19917:
URL: https://github.com/apache/kafka/pull/19917#discussion_r2150949379


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) {
     }
 
     public List<CompletableEvent<?>> uncompletedEvents() {
-        return tracked.stream()
-                .filter(e -> !e.future().isDone())
-                .collect(Collectors.toList());
+        List<CompletableEvent<?>> events = new ArrayList<>();
+
+        for (CompletableEvent<?> event : tracked) {
+            if (!event.future().isDone())
+                events.add(event);
+        }
+
+        return events;
+    }
+
+    /**
+     * For all the {@link CompletableEvent}s in the collection, if they're not 
already complete, invoke
+     * {@link CompletableFuture#completeExceptionally(Throwable)}.
+     *
+     * @param events Collection of objects, assumed to be subclasses of {@link 
ApplicationEvent} or
+     *               {@link BackgroundEvent}, but will only perform completion 
for any
+     *               unfinished {@link CompletableEvent}s
+     *
+     *  @return Number of events closed
+     */
+    private long completeEventsExceptionallyOnClose(Collection<?> events) {
+        long count = 0;
+
+        for (Object o : events) {
+            if (!(o instanceof CompletableEvent))
+                continue;
+
+            CompletableEvent<?> event = (CompletableEvent<?>) o;
+
+            if (event.future().isDone())
+                continue;
+
+            TimeoutException error = new TimeoutException(String.format("%s 
could not be completed before the consumer closed", 
event.getClass().getSimpleName()));
+
+            if (event.future().completeExceptionally(error)) {
+                log.debug("Event {} completed exceptionally since the consumer 
is closing", event);
+                count++;

Review Comment:
   Yes, it's different. I assert it's more accurate, but I changed it to mimic 
the existing counting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to