dongnuo123 commented on code in PR #15954:
URL: https://github.com/apache/kafka/pull/15954#discussion_r1601951216


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync(
         return EMPTY_RESULT;
     }
 
+    /**
+     * Handle a SyncGroupRequest to a ConsumerGroup.
+     *
+     * @param group          The ConsumerGroup.
+     * @param context        The request context.
+     * @param request        The actual SyncGroup request.
+     * @param responseFuture The sync group response future.
+     *
+     * @return The result that contains records to append.
+     */
+    private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup(
+        ConsumerGroup group,
+        RequestContext context,
+        SyncGroupRequestData request,
+        CompletableFuture<SyncGroupResponseData> responseFuture
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        String instanceId = request.groupInstanceId();
+
+        ConsumerGroupMember member;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(request.memberId(), false);
+        } else {
+            member = group.staticMember(instanceId);
+            if (member == null) {
+                throw new UnknownMemberIdException(
+                    String.format("Member with instance id %s is not a member 
of group %s.", instanceId, groupId)
+                );
+            }
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+        }
+
+        throwIfMemberDoesNotUseClassicProtocol(member);
+        throwIfGenerationIdOrProtocolUnmatched(
+            group,
+            member,
+            request.generationId(),
+            request.protocolType(),
+            request.protocolName()
+        );
+
+        cancelConsumerGroupSyncTimeout(groupId, memberId);
+//        scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicMemberSessionTimeout());
+
+        byte[] assignment = ConsumerProtocol.serializeAssignment(
+            new 
ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(),
 metadataImage.topics())),
+            deserializeProtocolVersion(member.classicMemberMetadata().get())
+        ).array();
+
+        responseFuture.complete(new SyncGroupResponseData()
+            .setProtocolType(request.protocolType())
+            .setProtocolName(request.protocolName())
+            .setAssignment(assignment)
+            .setErrorCode(Errors.NONE.code()));

Review Comment:
   Ah yes, you're right. I thought there's nothing to commit for EMPTY_RESULT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to