dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582403702
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1413,6 +1506,243 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
return new CoordinatorResult<>(records, response);
}
+ /**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group The group to join.
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group
phase completes.
+ */
+ private CoordinatorResult<Void, Record> classicGroupJoinToConsumerGroup(
+ ConsumerGroup group,
+ RequestContext context,
+ JoinGroupRequestData request,
+ CompletableFuture<JoinGroupResponseData> responseFuture
+ ) throws ApiException {
+ final long currentTimeMs = time.milliseconds();
+ final List<Record> records = new ArrayList<>();
+ final String groupId = request.groupId();
+ String memberId = request.memberId();
+ final String instanceId = request.groupInstanceId();
+ final JoinGroupRequestProtocolCollection protocols =
request.protocols();
+ final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+ throwIfConsumerGroupIsFull(group, memberId);
+ throwIfClassicProtocolIsNotSupported(group, memberId,
request.protocolType(), protocols);
+ // TODO: need to throw an exception if group is dead?
Review Comment:
Do we ever transition a consumer group to DEAD? It's always confusing when
it comes to type check in the new group coordinator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]