dajac commented on code in PR #12308: URL: https://github.com/apache/kafka/pull/12308#discussion_r1011828013
########## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ########## @@ -1308,7 +1312,9 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextHeartbeatExpiration(group, member) val knownStaticMember = group.get(newMemberId) - group.updateMember(knownStaticMember, protocols, responseCallback) + val oldRebalanceTimeoutMs = knownStaticMember.rebalanceTimeoutMs + val oldSessionTimeoutMs = knownStaticMember.sessionTimeoutMs + group.updateMember(knownStaticMember, protocols, rebalanceTimeoutMs, sessionTimeoutMs, responseCallback) val oldProtocols = knownStaticMember.supportedProtocols Review Comment: Not related to your PR, but it seems that we should get the old protocols before updating the member, no? We may have a gap in our testing here. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -1036,6 +1036,48 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val it = group.allMemberMetadata.iterator + while (it.hasNext) { + val member = it.next() Review Comment: nit: Should we use the following? ``` group.allMemberMetadata.foreach { member => assert.... } ``` ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -1036,6 +1036,48 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val it = group.allMemberMetadata.iterator + while (it.hasNext) { + val member = it.next() + assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) + assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) + } + } + + + @Test + def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) + checkJoinGroupResult(followerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) + assertEquals(leaderJoinGroupResult.error, Errors.NONE) Review Comment: Why do you use `checkJoinGroupResult` here as well? ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -1036,6 +1036,48 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val it = group.allMemberMetadata.iterator + while (it.hasNext) { + val member = it.next() + assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) + assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) + } + } + + + @Test + def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val followerJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) + checkJoinGroupResult(followerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + val leaderJoinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout) + assertEquals(leaderJoinGroupResult.error, Errors.NONE) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val it = group.allMemberMetadata.iterator + while (it.hasNext) { + val member = it.next() Review Comment: nit: same comment as the previous one. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -1036,6 +1036,48 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, 2 * DefaultSessionTimeout, 2 * DefaultRebalanceTimeout, appendRecordError = Errors.MESSAGE_TOO_LARGE) + checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + assertTrue(groupCoordinator.groupManager.getGroup(groupId).isDefined) + val group = groupCoordinator.groupManager.getGroup(groupId).get + val it = group.allMemberMetadata.iterator + while (it.hasNext) { + val member = it.next() + assertEquals(member.sessionTimeoutMs, DefaultSessionTimeout) + assertEquals(member.rebalanceTimeoutMs, DefaultRebalanceTimeout) + } + } + + + @Test + def staticMemberJoinStorageWithUnknownPersistenceSuccess(): Unit = { Review Comment: nit: Could we also rename this one? The name does not really explain what we are testing here. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -1036,6 +1036,48 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, syncGroupWithOldMemberIdResult.error) } + @Test + def staticMemberJoinStorageWithUnknownPersistenceFailure(): Unit = { Review Comment: nit: The test name is not so good here. How about `staticMemberRejoinWithUpdatedSessionAndRebalanceTimeoutsButCannotPersistChange`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org