kirktrue commented on code in PR #19917: URL: https://github.com/apache/kafka/pull/19917#discussion_r2150949379
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) { } public List<CompletableEvent<?>> uncompletedEvents() { - return tracked.stream() - .filter(e -> !e.future().isDone()) - .collect(Collectors.toList()); + List<CompletableEvent<?>> events = new ArrayList<>(); + + for (CompletableEvent<?> event : tracked) { + if (!event.future().isDone()) + events.add(event); + } + + return events; + } + + /** + * For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * @param events Collection of objects, assumed to be subclasses of {@link ApplicationEvent} or + * {@link BackgroundEvent}, but will only perform completion for any + * unfinished {@link CompletableEvent}s + * + * @return Number of events closed + */ + private long completeEventsExceptionallyOnClose(Collection<?> events) { + long count = 0; + + for (Object o : events) { + if (!(o instanceof CompletableEvent)) + continue; + + CompletableEvent<?> event = (CompletableEvent<?>) o; + + if (event.future().isDone()) + continue; + + TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since the consumer is closing", event); + count++; Review Comment: Yes, it's different. I assert it's more accurate, but I changed it to mimic the existing counting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org