dajac commented on code in PR #17286:
URL: https://github.com/apache/kafka/pull/17286#discussion_r1778129613


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2727,33 +2782,39 @@ private <T> CoordinatorResult<T, CoordinatorRecord> 
consumerGroupFenceMember(
         ConsumerGroupMember member,
         T response
     ) {
-        if (validateOnlineDowngrade(group, member.memberId())) {
-            return convertToClassicGroup(group, member.memberId(), response);
-        } else {
-            List<CoordinatorRecord> records = new ArrayList<>();
-            removeMember(records, group.groupId(), member.memberId());
+        List<CoordinatorRecord> records = new ArrayList<>();
+        removeMember(records, group.groupId(), member.memberId());
 
-            // We update the subscription metadata without the leaving member.
-            Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
-                group.computeSubscribedTopicNames(member, null),
-                metadataImage.topics(),
-                metadataImage.cluster()
-            );
+        // We update the subscription metadata without the leaving member.
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
+            group.computeSubscribedTopicNames(member, null),
+            metadataImage.topics(),
+            metadataImage.cluster()
+        );
 
-            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
-                    group.groupId(), subscriptionMetadata);
-                
records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
-            }
+        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+            log.info("[GroupId {}] Computed new subscription metadata: {}.",
+                group.groupId(), subscriptionMetadata);
+            
records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), 
subscriptionMetadata));
+        }
 
-            // We bump the group epoch.
+        // We bump the group epoch if the group doesn't need a downgrade,
+        // or the rebalance will be triggered after the downgrade conversion.
+        if (!validateOnlineDowngradeWithFencedMember(group, 
member.memberId())) {
             int groupEpoch = group.groupEpoch() + 1;
             records.add(newConsumerGroupEpochRecord(group.groupId(), 
groupEpoch));
+        }
 
-            cancelTimers(group.groupId(), member.memberId());
+        cancelTimers(group.groupId(), member.memberId());
 
-            return new CoordinatorResult<>(records, response);
-        }
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        appendFuture.whenComplete((__, t) -> {
+            if (t == null) {
+                scheduleConsumerGroupDowngradeTimeout(group, true);

Review Comment:
   I have a similar thought here. I wonder if we should just schedule it at 
L2806 in an else branch. We already know there that we need to downgrade the 
group. What do you think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3125,25 +3128,25 @@ public void testSessionTimeoutExpiration() {
         List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
 
         // Verify the expired timeout.
+        assertEquals(1, timeouts.size());

Review Comment:
   Is there a reason why we need to change this code? Ah, it may be because of 
the append future. Is it the case?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10944,15 +10947,225 @@ public void 
testLastConsumerProtocolMemberLeavingConsumerGroup() {
         ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 
-        // Simulate a failed write to the log.
-        result.appendFuture().completeExceptionally(new 
NotLeaderOrFollowerException());
+        // Simulate a failed conversion.
+        downgradeTimeout.result.appendFuture().completeExceptionally(new 
NotLeaderOrFollowerException());
         context.rollback();
 
         // The group is reverted back to the consumer group.
         assertEquals(consumerGroup, 
context.groupMetadataManager.consumerGroup(groupId));
         verify(context.metrics, 
times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null);
     }
 
+    @Test
+    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws 
ExecutionException, InterruptedException {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Arrays.asList(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols)
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(Collections.singletonList("foo"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)))
+            .build();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and static member 2 uses the 
consumer protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 2)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
+            {
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
2));
+            }
+        }));
+
+        context.commit();
+
+        // A new member using classic protocol with the same instance id 
joins, scheduling the downgrade.
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+        result.appendFuture.complete(null);
+        memberId2 = result.joinFuture.get().memberId();
+
+        ExpiredTimeout<Void, CoordinatorRecord> downgradeTimeout = 
context.sleep(0).get(0);
+        assertEquals(consumerGroupDowngradeKey(groupId), downgradeTimeout.key);
+
+        byte[] assignment1 = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 0),
+            new TopicPartition(fooTopicName, 1),
+            new TopicPartition(fooTopicName, 2),
+            new TopicPartition(barTopicName, 0),
+            new TopicPartition(barTopicName, 1)
+        ))));
+        byte[] assignment2 = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 3),
+            new TopicPartition(fooTopicName, 4),
+            new TopicPartition(fooTopicName, 5)
+        ))));
+        Map<String, byte[]> assignments = new HashMap<>();
+        assignments.put(memberId1, assignment1);
+        assignments.put(memberId2, assignment2);
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            new LogContext(),
+            groupId,
+            STABLE,
+            context.time,
+            context.metrics,
+            10,
+            Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.of("range"),
+            Optional.of(memberId1),
+            Optional.of(context.time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.ofNullable(member1.instanceId()),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                member1.classicProtocolSessionTimeout().get(),
+                ConsumerProtocol.PROTOCOL_TYPE,
+                member1.supportedJoinGroupRequestProtocols(),
+                assignment1
+            )
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId2,
+                Optional.ofNullable(member2.instanceId()),
+                DEFAULT_CLIENT_ID,
+                DEFAULT_CLIENT_ADDRESS.toString(),
+                joinRequest.rebalanceTimeoutMs(),
+                joinRequest.sessionTimeoutMs(),
+                joinRequest.protocolType(),
+                joinRequest.protocols(),
+                assignment2
+            )
+        );
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
+
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
+
+            
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting())
+        );
+
+        assertEquals(expectedRecords.size(), 
downgradeTimeout.result.records().size());
+        assertUnorderedListEquals(expectedRecords.subList(0, 2), 
downgradeTimeout.result.records().subList(0, 2));
+        assertUnorderedListEquals(expectedRecords.subList(2, 4), 
downgradeTimeout.result.records().subList(2, 4));
+        assertRecordEquals(expectedRecords.get(4), 
downgradeTimeout.result.records().get(4));
+        assertUnorderedListEquals(expectedRecords.subList(5, 7), 
downgradeTimeout.result.records().subList(5, 7));
+        assertRecordsEquals(expectedRecords.subList(7, 9), 
downgradeTimeout.result.records().subList(7, 9));
+
+        // Leader can be either member 1 or member 2.
+        try {
+            assertRecordEquals(expectedRecords.get(9), 
downgradeTimeout.result.records().get(9));
+        } catch (AssertionFailedError e) {
+            expectedClassicGroup.setLeaderId(Optional.of(memberId2));
+            assertRecordEquals(
+                
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting()),
+                downgradeTimeout.result.records().get(9)
+            );
+        }
+
+        verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
 null);
+        verify(context.metrics, times(1)).onClassicGroupStateTransition(null, 
STABLE);
+
+        // The new classic member 1 has a heartbeat timeout.
+        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+            classicGroupHeartbeatKey(groupId, memberId1)
+        );
+        assertNotNull(heartbeatTimeout);
+
+        // No rebalance is triggered.
+        ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(classicGroup.isInState(STABLE));
+    }
+

Review Comment:
   I wonder if we should also test the following case:
   * Last member using the consumer protocol leaves. We schedule the downgrade 
task;
   * A new member using the consumer protocol joins before the downgrade task 
is executed;
   * The downgrade task is executed and should be a no-op.
   
   What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2049,6 +2099,11 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
                 scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), 
request.rebalanceTimeoutMs());
 
                 responseFuture.complete(response);
+
+                // Maybe downgrade the consumer group if the last member using 
the
+                // consumer protocol is replaced by the joining member. No 
rebalance
+                // is needed after the replacement.
+                scheduleConsumerGroupDowngradeTimeout(group, false);

Review Comment:
   I wonder if we should rather call this one out side of the `appendFuture`. 
My concern is that we call `validateOnlineDowngrade` in 
`scheduleConsumerGroupDowngradeTimeout` and it accesses the uncommitted state 
of the group while we actually complete the write operation. This may lead to 
unexpected results.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3128,6 +3189,27 @@ private void cancelConsumerGroupSyncTimeout(
         timer.cancel(consumerGroupSyncKey(groupId, memberId));
     }
 
+    /**
+     * Maybe schedules the downgrade timeout for the consumer group.
+     *
+     * @param consumerGroup     The group to downgrade.
+     * @param needsRebalance    The boolean indicating whether a rebalance 
should be triggered after the conversion.
+     */
+    private void scheduleConsumerGroupDowngradeTimeout(
+        ConsumerGroup consumerGroup,
+        boolean needsRebalance
+    ) {
+        if (validateOnlineDowngrade(consumerGroup)) {

Review Comment:
   If we do the changes that I suggested, I think that we could remove this 
check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to