dajac commented on code in PR #17286: URL: https://github.com/apache/kafka/pull/17286#discussion_r1778129613
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2727,33 +2782,39 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMember( ConsumerGroupMember member, T response ) { - if (validateOnlineDowngrade(group, member.memberId())) { - return convertToClassicGroup(group, member.memberId(), response); - } else { - List<CoordinatorRecord> records = new ArrayList<>(); - removeMember(records, group.groupId(), member.memberId()); + List<CoordinatorRecord> records = new ArrayList<>(); + removeMember(records, group.groupId(), member.memberId()); - // We update the subscription metadata without the leaving member. - Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata( - group.computeSubscribedTopicNames(member, null), - metadataImage.topics(), - metadataImage.cluster() - ); + // We update the subscription metadata without the leaving member. + Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata( + group.computeSubscribedTopicNames(member, null), + metadataImage.topics(), + metadataImage.cluster() + ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { - log.info("[GroupId {}] Computed new subscription metadata: {}.", - group.groupId(), subscriptionMetadata); - records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); - } + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + group.groupId(), subscriptionMetadata); + records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); + } - // We bump the group epoch. + // We bump the group epoch if the group doesn't need a downgrade, + // or the rebalance will be triggered after the downgrade conversion. + if (!validateOnlineDowngradeWithFencedMember(group, member.memberId())) { int groupEpoch = group.groupEpoch() + 1; records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch)); + } - cancelTimers(group.groupId(), member.memberId()); + cancelTimers(group.groupId(), member.memberId()); - return new CoordinatorResult<>(records, response); - } + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + if (t == null) { + scheduleConsumerGroupDowngradeTimeout(group, true); Review Comment: I have a similar thought here. I wonder if we should just schedule it at L2806 in an else branch. We already know there that we need to downgrade the group. What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -3125,25 +3128,25 @@ public void testSessionTimeoutExpiration() { List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(45000 + 1); // Verify the expired timeout. + assertEquals(1, timeouts.size()); Review Comment: Is there a reason why we need to change this code? Ah, it may be because of the append future. Is it the case? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10944,15 +10947,225 @@ public void testLastConsumerProtocolMemberLeavingConsumerGroup() { ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); - // Simulate a failed write to the log. - result.appendFuture().completeExceptionally(new NotLeaderOrFollowerException()); + // Simulate a failed conversion. + downgradeTimeout.result.appendFuture().completeExceptionally(new NotLeaderOrFollowerException()); context.rollback(); // The group is reverted back to the consumer group. assertEquals(consumerGroup, context.groupMetadataManager.consumerGroup(groupId)); verify(context.metrics, times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null); } + @Test + public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws ExecutionException, InterruptedException { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String instanceId = "instance-id"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ) + )))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setInstanceId(instanceId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and static member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 2) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 2)); + } + })); + + context.commit(); + + // A new member using classic protocol with the same instance id joins, scheduling the downgrade. + JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId(instanceId) + .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .withDefaultProtocolTypeAndProtocols() + .build(); + GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest); + result.appendFuture.complete(null); + memberId2 = result.joinFuture.get().memberId(); + + ExpiredTimeout<Void, CoordinatorRecord> downgradeTimeout = context.sleep(0).get(0); + assertEquals(consumerGroupDowngradeKey(groupId), downgradeTimeout.key); + + byte[] assignment1 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + )))); + byte[] assignment2 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( + new TopicPartition(fooTopicName, 3), + new TopicPartition(fooTopicName, 4), + new TopicPartition(fooTopicName, 5) + )))); + Map<String, byte[]> assignments = new HashMap<>(); + assignments.put(memberId1, assignment1); + assignments.put(memberId2, assignment2); + + ClassicGroup expectedClassicGroup = new ClassicGroup( + new LogContext(), + groupId, + STABLE, + context.time, + context.metrics, + 10, + Optional.of(ConsumerProtocol.PROTOCOL_TYPE), + Optional.of("range"), + Optional.of(memberId1), + Optional.of(context.time.milliseconds()) + ); + expectedClassicGroup.add( + new ClassicGroupMember( + memberId1, + Optional.ofNullable(member1.instanceId()), + member1.clientId(), + member1.clientHost(), + member1.rebalanceTimeoutMs(), + member1.classicProtocolSessionTimeout().get(), + ConsumerProtocol.PROTOCOL_TYPE, + member1.supportedJoinGroupRequestProtocols(), + assignment1 + ) + ); + expectedClassicGroup.add( + new ClassicGroupMember( + memberId2, + Optional.ofNullable(member2.instanceId()), + DEFAULT_CLIENT_ID, + DEFAULT_CLIENT_ADDRESS.toString(), + joinRequest.rebalanceTimeoutMs(), + joinRequest.sessionTimeoutMs(), + joinRequest.protocolType(), + joinRequest.protocols(), + assignment2 + ) + ); + + List<CoordinatorRecord> expectedRecords = Arrays.asList( + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), + + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId), + + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId), + + GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()) + ); + + assertEquals(expectedRecords.size(), downgradeTimeout.result.records().size()); + assertUnorderedListEquals(expectedRecords.subList(0, 2), downgradeTimeout.result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), downgradeTimeout.result.records().subList(2, 4)); + assertRecordEquals(expectedRecords.get(4), downgradeTimeout.result.records().get(4)); + assertUnorderedListEquals(expectedRecords.subList(5, 7), downgradeTimeout.result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(7, 9), downgradeTimeout.result.records().subList(7, 9)); + + // Leader can be either member 1 or member 2. + try { + assertRecordEquals(expectedRecords.get(9), downgradeTimeout.result.records().get(9)); + } catch (AssertionFailedError e) { + expectedClassicGroup.setLeaderId(Optional.of(memberId2)); + assertRecordEquals( + GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()), + downgradeTimeout.result.records().get(9) + ); + } + + verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); + verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE); + + // The new classic member 1 has a heartbeat timeout. + ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout( + classicGroupHeartbeatKey(groupId, memberId1) + ); + assertNotNull(heartbeatTimeout); + + // No rebalance is triggered. + ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + assertTrue(classicGroup.isInState(STABLE)); + } + Review Comment: I wonder if we should also test the following case: * Last member using the consumer protocol leaves. We schedule the downgrade task; * A new member using the consumer protocol joins before the downgrade task is executed; * The downgrade task is executed and should be a no-op. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2049,6 +2099,11 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs()); responseFuture.complete(response); + + // Maybe downgrade the consumer group if the last member using the + // consumer protocol is replaced by the joining member. No rebalance + // is needed after the replacement. + scheduleConsumerGroupDowngradeTimeout(group, false); Review Comment: I wonder if we should rather call this one out side of the `appendFuture`. My concern is that we call `validateOnlineDowngrade` in `scheduleConsumerGroupDowngradeTimeout` and it accesses the uncommitted state of the group while we actually complete the write operation. This may lead to unexpected results. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3128,6 +3189,27 @@ private void cancelConsumerGroupSyncTimeout( timer.cancel(consumerGroupSyncKey(groupId, memberId)); } + /** + * Maybe schedules the downgrade timeout for the consumer group. + * + * @param consumerGroup The group to downgrade. + * @param needsRebalance The boolean indicating whether a rebalance should be triggered after the conversion. + */ + private void scheduleConsumerGroupDowngradeTimeout( + ConsumerGroup consumerGroup, + boolean needsRebalance + ) { + if (validateOnlineDowngrade(consumerGroup)) { Review Comment: If we do the changes that I suggested, I think that we could remove this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org