[ 
https://issues.apache.org/jira/browse/KAFKA-7196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638459#comment-16638459
 ] 

ASF GitHub Bot commented on KAFKA-7196:
---------------------------------------

lindong28 closed pull request #5556: KAFKA-7196: Remove heartbeat delayed 
operation for those removed consumers at the end of each rebalance
URL: https://github.com/apache/kafka/pull/5556
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index c4e6dc97137..db89f14592f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -775,6 +775,7 @@ class GroupCoordinator(val brokerId: Int,
     group.inLock {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
+        removeHeartbeatForLeavingMember(group, failedMember)
         group.remove(failedMember.memberId)
         // TODO: cut the socket connection to the client
       }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 608d7cc997f..efa44fa4ae8 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -527,11 +527,29 @@ class GroupCoordinatorTest extends JUnitSuite {
     heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
     assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
 
-    // now timeout the rebalance, which should kick the unjoined member out of 
the group
-    // and let the rebalance finish with only the new member
+    // now timeout the rebalance
     timer.advanceClock(500)
     val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    val otherMemberId = otherJoinResult.memberId
+    val otherGenerationId = otherJoinResult.generationId
+    EasyMock.reset(replicaManager)
+    val syncResult = syncGroupLeader(groupId, otherGenerationId, 
otherMemberId, Map(otherMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncResult._2)
+
+    // the unjoined member should be kicked out from the group
     assertEquals(Errors.NONE, otherJoinResult.error)
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+    // the joined member should get heart beat response with no error. Let the 
new member keep heartbeating for a while
+    // to verify that no new rebalance is triggered unexpectedly
+    for ( _ <-  1 to 20) {
+      timer.advanceClock(500)
+      EasyMock.reset(replicaManager)
+      heartbeatResult = heartbeat(groupId, otherMemberId, otherGenerationId)
+      assertEquals(Errors.NONE, heartbeatResult)
+    }
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Remove heartbeat delayed operation for those removed consumers at the end of 
> each rebalance
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7196
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7196
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, purgatory
>            Reporter: Lincong Li
>            Assignee: Lincong Li
>            Priority: Minor
>             Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> During the consumer group rebalance, when the joining group phase finishes, 
> the heartbeat delayed operation of the consumer that fails to rejoin the 
> group should be removed from the purgatory. Otherwise, even though the member 
> ID of the consumer has been removed from the group, its heartbeat delayed 
> operation is still registered in the purgatory and the heartbeat delayed 
> operation is going to timeout and then another unnecessary rebalance is 
> triggered because of it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to