dongnuo123 commented on code in PR #15954: URL: https://github.com/apache/kafka/pull/15954#discussion_r1601951216
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3898,6 +3994,65 @@ public CoordinatorResult<Void, Record> classicGroupSync( return EMPTY_RESULT; } + /** + * Handle a SyncGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual SyncGroup request. + * @param responseFuture The sync group response future. + * + * @return The result that contains records to append. + */ + private CoordinatorResult<Void, Record> classicGroupSyncToConsumerGroup( + ConsumerGroup group, + RequestContext context, + SyncGroupRequestData request, + CompletableFuture<SyncGroupResponseData> responseFuture + ) throws UnknownMemberIdException, GroupIdNotFoundException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(request.memberId(), false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) + ); + } + throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); + } + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdOrProtocolUnmatched( + group, + member, + request.generationId(), + request.protocolType(), + request.protocolName() + ); + + cancelConsumerGroupSyncTimeout(groupId, memberId); +// scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicMemberSessionTimeout()); + + byte[] assignment = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(toTopicPartitionList(member.assignedPartitions(), metadataImage.topics())), + deserializeProtocolVersion(member.classicMemberMetadata().get()) + ).array(); + + responseFuture.complete(new SyncGroupResponseData() + .setProtocolType(request.protocolType()) + .setProtocolName(request.protocolName()) + .setAssignment(assignment) + .setErrorCode(Errors.NONE.code())); Review Comment: Ah yes, you're right. I thought there's nothing to commit for EMPTY_RESULT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org