dajac commented on code in PR #18513:
URL: https://github.com/apache/kafka/pull/18513#discussion_r1938990654


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4004,6 +3922,82 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       ConfigSource.DYNAMIC_TOPIC_CONFIG, false, false, 
Collections.emptyList(), null, null),
       topicConfigs.get(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG))
   }
+
+  class BackgroundConsumerSet(testGroupId: String, testClientId: String, 
defaultConsumerConfig: Properties) {
+    private val consumerSet: 
scala.collection.mutable.Set[Consumer[Array[Byte], Array[Byte]]] = 
scala.collection.mutable.Set.empty
+    private val consumerThreads: scala.collection.mutable.Set[Thread] = 
scala.collection.mutable.Set.empty
+    private var startLatch: CountDownLatch = new CountDownLatch(0)
+    private var stopLatch: CountDownLatch = new CountDownLatch(0)
+    private var consumerThreadRunning = new AtomicBoolean(false)
+
+    defaultConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+    defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+    def addConsumer(groupInstanceId: String, topic: String, configOverrides: 
Properties = new Properties()): Unit = {
+      val newConsumerConfig = 
defaultConsumerConfig.clone().asInstanceOf[Properties]
+      if (groupInstanceId != "") {
+        // static member
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
+      }
+      newConsumerConfig.putAll(configOverrides)
+
+      val consumer = createConsumer(configOverrides = newConsumerConfig)
+      val consumerThread = createConsumerThread(consumer, topic)
+      consumerSet.add(consumer)

Review Comment:
   Sounds good. I have one minor thought. I wonder whether we should turn the 
consumer thread into an explicit class. This would allow use to access the 
consumer reference and hence avoid the avoid to keep two lists here. What do 
you think?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1838,250 +1838,218 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         Utils.closeQuietly(producer, "producer")
       }
 
-      val EMPTY_GROUP_INSTANCE_ID = ""
       val testGroupId = "test_group_id"
       val testClientId = "test_client_id"
       val testInstanceId1 = "test_instance_id_1"
       val testInstanceId2 = "test_instance_id_2"
       val fakeGroupId = "fake_group_id"
 
-      def createProperties(groupInstanceId: String): Properties = {
-        val newConsumerConfig = new Properties(consumerConfig)
-        // We need to disable the auto commit because after the members got 
removed from group, the offset commit
-        // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
-        
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
-        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
-        if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
-          
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
-        }
-        newConsumerConfig
-      }
-
       // contains two static members and one dynamic member
-      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, 
EMPTY_GROUP_INSTANCE_ID)
-      val consumerSet = groupInstanceSet.map { groupInstanceId => 
createConsumer(configOverrides = createProperties(groupInstanceId))}
+      val groupInstanceSet = Set(testInstanceId1, testInstanceId2, "")
       val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
 
-      val latch = new CountDownLatch(consumerSet.size)
-      try {
-        def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): 
Thread = {
-          new Thread {
-            override def run : Unit = {
-              consumer.subscribe(Collections.singleton(topic))
-              try {
-                while (true) {
-                  consumer.poll(JDuration.ofSeconds(5))
-                  if (!consumer.assignment.isEmpty && latch.getCount > 0L)
-                    latch.countDown()
-                  try {
-                    consumer.commitSync()
-                  } catch {
-                    case _: CommitFailedException => // Ignore and retry on 
next iteration.
-                  }
-                }
-              } catch {
-                case _: InterruptException => // Suppress the output to stderr
-              }
-            }
-          }
+      // We need to disable the auto commit because after the members got 
removed from group, the offset commit
+      // will cause the member rejoining and the test will be flaky (check 
ConsumerCoordinator#OffsetCommitResponseHandler)
+      val defaultConsumerConfig = new Properties(consumerConfig)
+      
defaultConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
+      defaultConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+
+      val backgroundConsumerSet = new BackgroundConsumerSet(testGroupId, 
defaultConsumerConfig)
+      groupInstanceSet.zip(topicSet).foreach { case (groupInstanceId, topic) =>
+        val configOverrides = new Properties()
+        if (groupInstanceId != "") {
+          // static member
+          configOverrides.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId)
         }
+        backgroundConsumerSet.addConsumer(topic, configOverrides)
+      }
 
-        // Start consumers in a thread that will subscribe to a new group.
-        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createConsumerThread(zipped._1, zipped._2))
+      try {
         val groupType = if 
(groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) 
GroupType.CONSUMER else GroupType.CLASSIC
+        // Start consumer polling threads in the background
+        backgroundConsumerSet.start()
 
-        try {
-          consumerThreads.foreach(_.start())
-          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
-          // Test that we can list the new group.
-          TestUtils.waitUntilTrue(() => {
-            val matching = 
client.listConsumerGroups.all.get.asScala.filter(group =>
-                group.groupId == testGroupId &&
-                group.groupState.get == GroupState.STABLE)
-            matching.size == 1
-          }, s"Expected to be able to list $testGroupId")
-
-          TestUtils.waitUntilTrue(() => {
-            val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
-            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
-                group.groupId == testGroupId &&
-                group.groupState.get == GroupState.STABLE)
-            matching.size == 1
-          }, s"Expected to be able to list $testGroupId in group type 
$groupType")
-
-          TestUtils.waitUntilTrue(() => {
-            val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
-              .inGroupStates(Set(GroupState.STABLE).asJava)
-            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+        // Test that we can list the new group.
+        TestUtils.waitUntilTrue(() => {
+          val matching = 
client.listConsumerGroups.all.get.asScala.filter(group =>
               group.groupId == testGroupId &&
-                group.groupState.get == GroupState.STABLE)
-            matching.size == 1
-          }, s"Expected to be able to list $testGroupId in group type 
$groupType and state Stable")
-
-          TestUtils.waitUntilTrue(() => {
-            val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
-            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
-                group.groupId == testGroupId &&
-                group.groupState.get == GroupState.STABLE)
-            matching.size == 1
-          }, s"Expected to be able to list $testGroupId in state Stable")
-
-          TestUtils.waitUntilTrue(() => {
-            val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
-            val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(
-                _.groupId == testGroupId)
-            matching.isEmpty
-          }, s"Expected to find zero groups")
-
-          val describeWithFakeGroupResult = 
client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
-            new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
-          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
-
-          // Test that we can get information about the test consumer group.
-          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
-          var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
-          if (groupType == GroupType.CLASSIC) {
-            assertTrue(testGroupDescription.groupEpoch.isEmpty)
-            assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
-          } else {
-            assertEquals(Optional.of(3), testGroupDescription.groupEpoch)
-            assertEquals(Optional.of(3), 
testGroupDescription.targetAssignmentEpoch)
-          }
+              group.groupState.get == GroupState.STABLE)
+          matching.size == 1
+        }, s"Expected to be able to list $testGroupId")
 
-          assertEquals(testGroupId, testGroupDescription.groupId())
-          assertFalse(testGroupDescription.isSimpleConsumerGroup)
-          assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
-          val members = testGroupDescription.members()
-          members.asScala.foreach { member =>
-            assertEquals(testClientId, member.clientId)
-            assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty 
else Optional.of(true), member.upgraded)
-          }
-          val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
-          topicSet.foreach { topic =>
-            val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
-            assertEquals(testNumPartitions, topicPartitions.size)
-          }
+        TestUtils.waitUntilTrue(() => {
+          val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
+          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+            group.groupId == testGroupId &&
+              group.groupState.get == GroupState.STABLE)
+          matching.size == 1
+        }, s"Expected to be able to list $testGroupId in group type 
$groupType")
 
-          val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
-          assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
-
-          // Test that the fake group throws GroupIdNotFoundException
-          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
-          
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
 classOf[GroupIdNotFoundException],
-            s"Group $fakeGroupId not found.")
-
-          // Test that all() also throws GroupIdNotFoundException
-          assertFutureThrows(describeWithFakeGroupResult.all(), 
classOf[GroupIdNotFoundException],
-            s"Group $fakeGroupId not found.")
-
-          val testTopicPart0 = new TopicPartition(testTopicName, 0)
-
-          // Test listConsumerGroupOffsets
-          TestUtils.waitUntilTrue(() => {
-            val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
-            parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 1)
-          }, s"Expected the offset for partition 0 to eventually become 1.")
-
-          // Test listConsumerGroupOffsets with requireStable true
-          val options = new 
ListConsumerGroupOffsetsOptions().requireStable(true)
-          var parts = client.listConsumerGroupOffsets(testGroupId, options)
-            .partitionsToOffsetAndMetadata().get()
-          assertTrue(parts.containsKey(testTopicPart0))
-          assertEquals(1, parts.get(testTopicPart0).offset())
-
-          // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
-          val groupSpecs = Collections.singletonMap(testGroupId,
-            new 
ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new 
TopicPartition(testTopicName, 0))))
-          parts = 
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
-          assertTrue(parts.containsKey(testTopicPart0))
-          assertEquals(1, parts.get(testTopicPart0).offset())
-
-          // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec 
and requireStable option
-          parts = client.listConsumerGroupOffsets(groupSpecs, 
options).partitionsToOffsetAndMetadata().get()
-          assertTrue(parts.containsKey(testTopicPart0))
-          assertEquals(1, parts.get(testTopicPart0).offset())
-
-          // Test delete non-exist consumer instance
-          val invalidInstanceId = "invalid-instance-id"
-          var removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions(
-            Collections.singleton(new MemberToRemove(invalidInstanceId))
-          ))
-
-          assertFutureThrows(removeMembersResult.all, 
classOf[UnknownMemberIdException])
-          val firstMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(invalidInstanceId))
-          assertFutureThrows(firstMemberFuture, 
classOf[UnknownMemberIdException])
-
-          // Test consumer group deletion
-          var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
-          assertEquals(2, deleteResult.deletedGroups().size())
-
-          // Deleting the fake group ID should get GroupIdNotFoundException.
-          assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
-          assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
-            classOf[GroupIdNotFoundException])
-
-          // Deleting the real group ID should get GroupNotEmptyException
-          assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-          assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
-            classOf[GroupNotEmptyException])
+        TestUtils.waitUntilTrue(() => {
+          val options = new 
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
+            .inGroupStates(Set(GroupState.STABLE).asJava)
+          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+            group.groupId == testGroupId &&
+              group.groupState.get == GroupState.STABLE)
+          matching.size == 1
+        }, s"Expected to be able to list $testGroupId in group type $groupType 
and state Stable")
 
-          // Test delete one correct static member
-          val removeOptions = new 
RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1)))
-          removeOptions.reason("test remove")
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
-
-          assertNull(removeMembersResult.all().get())
-          val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1))
-          assertNull(validMemberFuture.get())
-
-          val describeTestGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
-            new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
-          assertEquals(1, describeTestGroupResult.describedGroups().size())
-
-          testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
-
-          assertEquals(testGroupId, testGroupDescription.groupId)
-          assertFalse(testGroupDescription.isSimpleConsumerGroup)
-          assertEquals(consumerSet.size - 1, 
testGroupDescription.members().size())
-
-          // Delete all active members remaining (a static member + a dynamic 
member)
-          removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions())
-          assertNull(removeMembersResult.all().get())
-
-          // The group should contain no members now.
-          testGroupDescription = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
-            new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
-              .describedGroups().get(testGroupId).get()
-          assertTrue(testGroupDescription.members().isEmpty)
-
-          // Consumer group deletion on empty group should succeed
-          deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava)
-          assertEquals(1, deleteResult.deletedGroups().size())
-
-          assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
-          assertNull(deleteResult.deletedGroups().get(testGroupId).get())
-
-          // Test alterConsumerGroupOffsets
-          val alterConsumerGroupOffsetsResult = 
client.alterConsumerGroupOffsets(testGroupId,
-            Collections.singletonMap(testTopicPart0, new 
OffsetAndMetadata(0L)))
-          assertNull(alterConsumerGroupOffsetsResult.all().get())
-          
assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get())
-
-          // Verify alterConsumerGroupOffsets success
-          TestUtils.waitUntilTrue(() => {
-            val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
-            parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 0)
-          }, s"Expected the offset for partition 0 to eventually become 0.")
-      } finally {
-        consumerThreads.foreach {
-          case consumerThread =>
-            consumerThread.interrupt()
-            consumerThread.join()
+        TestUtils.waitUntilTrue(() => {
+          val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
+          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+              group.groupState.get == GroupState.STABLE)
+          matching.size == 1
+        }, s"Expected to be able to list $testGroupId in state Stable")
+
+        TestUtils.waitUntilTrue(() => {
+          val options = new 
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
+          val matching = 
client.listConsumerGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+          matching.isEmpty
+        }, s"Expected to find zero groups")
+
+        val describeWithFakeGroupResult = 
client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
+          new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
+        assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+
+        // Test that we can get information about the test consumer group.
+        
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
+        var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+        if (groupType == GroupType.CLASSIC) {
+          assertTrue(testGroupDescription.groupEpoch.isEmpty)
+          assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
+        } else {
+          assertEquals(Optional.of(3), testGroupDescription.groupEpoch)
+          assertEquals(Optional.of(3), 
testGroupDescription.targetAssignmentEpoch)
         }
-      }
+
+        assertEquals(testGroupId, testGroupDescription.groupId())
+        assertFalse(testGroupDescription.isSimpleConsumerGroup)
+        assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
+        val members = testGroupDescription.members()
+        members.asScala.foreach { member =>
+          assertEquals(testClientId, member.clientId)
+          assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty else 
Optional.of(true), member.upgraded)
+        }
+        val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
+        topicSet.foreach { topic =>
+          val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
+          assertEquals(testNumPartitions, topicPartitions.size)
+        }
+
+        val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
+        assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
+
+        // Test that the fake group throws GroupIdNotFoundException
+        
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
+        
assertFutureThrows(describeWithFakeGroupResult.describedGroups().get(fakeGroupId),
 classOf[GroupIdNotFoundException],
+          s"Group $fakeGroupId not found.")
+
+        // Test that all() also throws GroupIdNotFoundException
+        assertFutureThrows(describeWithFakeGroupResult.all(), 
classOf[GroupIdNotFoundException],
+          s"Group $fakeGroupId not found.")
+
+        val testTopicPart0 = new TopicPartition(testTopicName, 0)
+
+        // Test listConsumerGroupOffsets
+        TestUtils.waitUntilTrue(() => {
+          val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+          parts.containsKey(testTopicPart0) && 
(parts.get(testTopicPart0).offset() == 1)
+        }, s"Expected the offset for partition 0 to eventually become 1.")
+
+        // Test listConsumerGroupOffsets with requireStable true
+        val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
+        var parts = client.listConsumerGroupOffsets(testGroupId, options)
+          .partitionsToOffsetAndMetadata().get()
+        assertTrue(parts.containsKey(testTopicPart0))
+        assertEquals(1, parts.get(testTopicPart0).offset())
+
+        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
+        val groupSpecs = Collections.singletonMap(testGroupId,
+          new 
ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new 
TopicPartition(testTopicName, 0))))
+        parts = 
client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
+        assertTrue(parts.containsKey(testTopicPart0))
+        assertEquals(1, parts.get(testTopicPart0).offset())
+
+        // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and 
requireStable option
+        parts = client.listConsumerGroupOffsets(groupSpecs, 
options).partitionsToOffsetAndMetadata().get()
+        assertTrue(parts.containsKey(testTopicPart0))
+        assertEquals(1, parts.get(testTopicPart0).offset())
+
+        // Test delete non-exist consumer instance
+        val invalidInstanceId = "invalid-instance-id"
+        var removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions(
+          Collections.singleton(new MemberToRemove(invalidInstanceId))
+        ))
+
+        assertFutureThrows(removeMembersResult.all, 
classOf[UnknownMemberIdException])
+        val firstMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(invalidInstanceId))
+        assertFutureThrows(firstMemberFuture, 
classOf[UnknownMemberIdException])
+
+        // Test consumer group deletion
+        var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
+        assertEquals(2, deleteResult.deletedGroups().size())
+
+        // Deleting the fake group ID should get GroupIdNotFoundException.
+        assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
+        assertFutureThrows(deleteResult.deletedGroups().get(fakeGroupId),
+          classOf[GroupIdNotFoundException])
+
+        // Deleting the real group ID should get GroupNotEmptyException
+        assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
+        assertFutureThrows(deleteResult.deletedGroups().get(testGroupId),
+          classOf[GroupNotEmptyException])
+
+        // Stop the consumer threads and close consumers to prevent rejoining.
+        // dynamic member will be removed, leaving two static members in the 
group
+        backgroundConsumerSet.stop()
+
+        val describeTestGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,
+          new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
+        assertEquals(1, describeTestGroupResult.describedGroups().size())
+
+        testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
+        assertEquals(testGroupId, testGroupDescription.groupId)
+        assertFalse(testGroupDescription.isSimpleConsumerGroup)
+        assertEquals(groupInstanceSet.size - 1, 
testGroupDescription.members().size())
+
+        // Test delete one correct static member
+        removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId,
+          new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new 
MemberToRemove(testInstanceId1))))
+
+        assertNull(removeMembersResult.all().get())
+        assertNull(removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId1)).get())
+
+        // Delete all active members remaining (a static member)
+        removeMembersResult = 
client.removeMembersFromConsumerGroup(testGroupId, new 
RemoveMembersFromConsumerGroupOptions())

Review Comment:
   It is tricky to reliably test this because we need to shutdown the consumers 
without making them leave the group. Otherwise, there is always a chance of 
having them rejoin the group before we run the assertions.
   
   This actually made me think that we could use 
`internal.leave.group.on.close` for this purpose so I am +1 for adding a small 
and separate integration test for this use case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to