lucasbru commented on code in PR #16272: URL: https://github.com/apache/kafka/pull/16272#discussion_r1638000934
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1278,12 +1278,10 @@ void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstEx autoCommitSync(timer); applicationEventHandler.add(new CommitOnCloseEvent()); - completeQuietly( - () -> { - maybeRevokePartitions(); - applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))); - }, - "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + completeQuietly(() -> maybeRevokePartitions(), Review Comment: So the point here seems to be, if `maybeRevokePartitions` fails, likely due to rebalancelistener, still leave the group. It looks like a good change to me. I'm just surprised to see that the legacy consumer does not seem to do this? If `onPrepareLeave` in `AbstractCoordinator` fails, we won't reach `maybeLeaveGroup`. So is this a bug also in the legacy consumer? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -158,7 +158,12 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { - consumer.close(Duration.ZERO); + try { + consumer.close(Duration.ZERO); + } catch (Exception e) { + // best effort to clean up after each test, but may throw (ex. if callbacks where + // throwing errors) Review Comment: I did this before but was asked to not do it, see https://github.com/apache/kafka/pull/15613/files/9fb917e4b1e60f238183c92d1ad3bc2565a7e1ea#r1559907295. That's why I added a "clean-up close" in the tests where close fails, with an expected exception (search for "clean-up" in this file). I'd also be fine with your (and my original approach) to have a best-effort clean up and ignore exceptions here. But then, let's remove the "clean-up close" code in the other tests. Any consistent approach is fine with me here. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -673,7 +677,15 @@ public CompletableFuture<Void> leaveGroup() { CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("Member {} callback to release assignment failed. Member will proceed " + + "to send leave group heartbeat", memberId, error); + } else { + log.debug("Member {} completed callback to release assignment and will send leave " + + "group heartbeat", memberId); + } // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.unsubscribe(); clearSubscription(); Review Comment: Is the name `clearSubscription` misleading? It seems like it clears the assignment, not the subscription. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1324,6 +1322,7 @@ void completeQuietly(final Utils.ThrowingRunnable function, } catch (TimeoutException e) { log.debug("Timeout expired before the {} operation could complete.", msg); Review Comment: Should we update `firstException` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org