dajac commented on code in PR #12308:
URL: https://github.com/apache/kafka/pull/12308#discussion_r1011828013

##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##########
@@ -1308,7 +1312,9 @@ class GroupCoordinator(val brokerId: Int,
     completeAndScheduleNextHeartbeatExpiration(group, member)
 
     val knownStaticMember = group.get(newMemberId)
-    group.updateMember(knownStaticMember, protocols, responseCallback)
+    val oldRebalanceTimeoutMs = knownStaticMember.rebalanceTimeoutMs
+    val oldSessionTimeoutMs = knownStaticMember.sessionTimeoutMs
+    group.updateMember(knownStaticMember, protocols, rebalanceTimeoutMs, 
sessionTimeoutMs, responseCallback)
     val oldProtocols = knownStaticMember.supportedProtocols

Review Comment:
   Not related to your PR, but it seems that we should get the old protocols 
before updating the member, no? We may have a gap in our testing here.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -1036,6 +1036,48 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
   }
 
+ @Test
+ def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = {
+   val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+   val joinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+   checkJoinGroupResult(joinGroupResult,
+     Errors.UNKNOWN_SERVER_ERROR,
+     rebalanceResult.generation,
+     Set.empty,
+     Stable,
+     Some(protocolType))
+   assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined)
+   val group = groupCoordinator.groupManager.getGroup(groupId).get
+   val it = group.allMemberMetadata.iterator
+   while (it.hasNext) {
+     val member = it.next()

Review Comment:
   nit: Should we use the following?
   
   ```
   group.allMemberMetadata.foreach { member =>
     assert....
   }
   ```



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -1036,6 +1036,48 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
   }
 
+ @Test
+ def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = {
+   val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+   val joinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+   checkJoinGroupResult(joinGroupResult,
+     Errors.UNKNOWN_SERVER_ERROR,
+     rebalanceResult.generation,
+     Set.empty,
+     Stable,
+     Some(protocolType))
+   assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined)
+   val group = groupCoordinator.groupManager.getGroup(groupId).get
+   val it = group.allMemberMetadata.iterator
+   while (it.hasNext) {
+     val member = it.next()
+     assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout)
+     assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout)
+   }
+ }
+
+
+  @Test
+  def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+    val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout)
+    checkJoinGroupResult(followerJoinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation,
+      Set.empty,
+      Stable,
+      Some(protocolType))
+    val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout)
+    assertEquals(leaderJoinGroupResult.error, Errors.NONE)

Review Comment:
   Why do you use `checkJoinGroupResult` here as well?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -1036,6 +1036,48 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
   }
 
+ @Test
+ def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = {
+   val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+   val joinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+   checkJoinGroupResult(joinGroupResult,
+     Errors.UNKNOWN_SERVER_ERROR,
+     rebalanceResult.generation,
+     Set.empty,
+     Stable,
+     Some(protocolType))
+   assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined)
+   val group = groupCoordinator.groupManager.getGroup(groupId).get
+   val it = group.allMemberMetadata.iterator
+   while (it.hasNext) {
+     val member = it.next()
+     assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout)
+     assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout)
+   }
+ }
+
+
+  @Test
+  def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = {
+    val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+    val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout)
+    checkJoinGroupResult(followerJoinGroupResult,
+      Errors.NONE,
+      rebalanceResult.generation,
+      Set.empty,
+      Stable,
+      Some(protocolType))
+    val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout)
+    assertEquals(leaderJoinGroupResult.error, Errors.NONE)
+    assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined)
+    val group = groupCoordinator.groupManager.getGroup(groupId).get
+    val it = group.allMemberMetadata.iterator
+    while (it.hasNext) {
+      val member = it.next()

Review Comment:
   nit: same comment as the previous one.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -1036,6 +1036,48 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
   }
 
+ @Test
+ def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = {
+   val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
+   val joinGroupResult = staticJoinGroupWithPersistence(groupId, 
JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, 
protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * 
DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE)
+   checkJoinGroupResult(joinGroupResult,
+     Errors.UNKNOWN_SERVER_ERROR,
+     rebalanceResult.generation,
+     Set.empty,
+     Stable,
+     Some(protocolType))
+   assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined)
+   val group = groupCoordinator.groupManager.getGroup(groupId).get
+   val it = group.allMemberMetadata.iterator
+   while (it.hasNext) {
+     val member = it.next()
+     assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout)
+     assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout)
+   }
+ }
+
+
+  @Test
+  def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = {

Review Comment:
   nit: Could we also rename this one? The name does not really explain what we 
are testing here.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala:
##########
@@ -1036,6 +1036,48 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error)
   }
 
+ @Test
+ def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = {

Review Comment:
   nit: The test name is not so good here. How about 
`staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsButCannotPersistChange`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to