lianetm commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1722392410
########## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ########## @@ -911,4 +911,42 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100))) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): Unit = { + val adminClient = createAdminClient() + val consumer = createConsumer() + val groupId = consumerConfig.getProperty("group.id") + + def hasMembers: Boolean = { + try { + val groupDescription = adminClient.describeConsumerGroups (Collections.singletonList (groupId) ).describedGroups.get (groupId).get + groupDescription.members.size() > 0 + } catch { + case _: ExecutionException | _: InterruptedException => + false + } + } + + val listener = new TestConsumerReassignmentListener() + consumer.subscribe(List(topic).asJava, listener) + awaitRebalance(consumer, listener) + + assertEquals(1, listener.callsToAssigned) + assertEquals(0, listener.callsToRevoked) + TestUtils.waitUntilTrue(() => hasMembers, s"Consumer did not join the consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of subscribe") + + try { + Thread.currentThread().interrupt() + assertThrows(classOf[InterruptException], () => consumer.close()) + } finally { + // Clear the interrupted flag so we don't create problems for subsequent tests. + Thread.interrupted() + } + + assertEquals(1, listener.callsToAssigned) + assertEquals(1, listener.callsToRevoked) + TestUtils.waitUntilTrue(() => !hasMembers, s"Consumer did not leave the consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of interrupt/close") Review Comment: Leaving the default session timeout and waiting for half of it definitely achieves the same result. The downside would be that waiting for half of it may not be enough for the group to become empty to the eyes of the admin client maybe? While playing with this when the issue first happened we confirmed that the classic consumer does send a leave group even if interrupted, that's why I still expect we should be able to have a test for it, and feels like we're close given that you confirm it consistently passes locally, but maybe just missing a tweak to have it passing in the CI? Totally ok for me to leave it disabled for the classic for now and focus on the fix for the new consumer, but should we file a jira to enable it for the classic? (understand if it simply needs more time, or what's the reason for it to fail if the consumer sends the leave) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org