lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1722392410


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -911,4 +911,42 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): 
Unit = {
+    val adminClient = createAdminClient()
+    val consumer = createConsumer()
+    val groupId = consumerConfig.getProperty("group.id")
+
+    def hasMembers: Boolean = {
+      try {
+        val groupDescription = adminClient.describeConsumerGroups 
(Collections.singletonList (groupId) ).describedGroups.get (groupId).get
+        groupDescription.members.size() > 0
+      } catch {
+        case _: ExecutionException | _: InterruptedException =>
+          false
+      }
+    }
+
+    val listener = new TestConsumerReassignmentListener()
+    consumer.subscribe(List(topic).asJava, listener)
+    awaitRebalance(consumer, listener)
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(0, listener.callsToRevoked)
+    TestUtils.waitUntilTrue(() => hasMembers, s"Consumer did not join the 
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of subscribe")
+
+    try {
+      Thread.currentThread().interrupt()
+      assertThrows(classOf[InterruptException], () => consumer.close())
+    } finally {
+      // Clear the interrupted flag so we don't create problems for subsequent 
tests.
+      Thread.interrupted()
+    }
+
+    assertEquals(1, listener.callsToAssigned)
+    assertEquals(1, listener.callsToRevoked)
+    TestUtils.waitUntilTrue(() => !hasMembers, s"Consumer did not leave the 
consumer group within ${JTestUtils.DEFAULT_MAX_WAIT_MS} of interrupt/close")

Review Comment:
   Leaving the default session timeout and waiting for half of it definitely 
achieves the same result. The downside would be that waiting for half of it may 
not be enough for the group to become empty to the eyes of the admin client 
maybe? 
   
   While playing with this when the issue first happened we confirmed that the 
classic consumer does send a leave group even if interrupted, that's why I 
still expect we should be able to have a test for it, and feels like we're 
close given that you confirm it consistently passes locally, but maybe just 
missing a tweak to have it passing in the CI?
   
   Totally ok for me to leave it disabled for the classic for now and focus on 
the fix for the new consumer, but should we file a jira to enable it for the 
classic? (understand if it simply needs more time, or what's the reason for it 
to fail if the consumer sends the leave)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to