lucasbru commented on code in PR #19219: URL: https://github.com/apache/kafka/pull/19219#discussion_r2020725313
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1805,6 +2019,290 @@ private List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi .toList(); } + /** + * Handles a regular heartbeat from a Streams group member. + * It mainly consists of five parts: + * 1) Create or update the member. + * The group epoch is bumped if the member has been created or updated. + * 2) Initialize or update the topology. + * The group epoch is bumped if the topology has been created or updated. + * 3) Determine the partition metadata and any internal topics that need to be created. + * 4) Update the target assignment for the streams group if the group epoch + * is larger than the current target assignment epoch. + * 5) Reconcile the member's assignment with the target assignment. + * + * @param groupId The group ID from the request. + * @param memberId The member ID from the request. + * @param memberEpoch The member epoch from the request. + * @param instanceId The instance ID from the request or null. + * @param rackId The rack ID from the request or null. + * @param rebalanceTimeoutMs The rebalance timeout from the request or -1. + * @param clientId The client ID. + * @param clientHost The client host. + * @param topology The topology from the request or null. + * @param ownedActiveTasks The list of owned active tasks from the request or null. + * @param ownedStandbyTasks The list of owned standby tasks from the request or null. + * @param ownedWarmupTasks The list of owned warmup tasks from the request or null. + * @param userEndpoint User-defined endpoint for Interactive Queries, or null. + * @param clientTags Used for rack-aware assignment algorithm, or null. + * @param taskEndOffsets Cumulative changelog offsets for tasks, or null. + * @param taskOffsets Cumulative changelog end-offsets for tasks, or null. + * @param shutdownApplication Whether all Streams clients in the group should shut down. + * @return A result containing the StreamsGroupHeartbeat response and a list of records to update the state machine. + */ + private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat( + String groupId, + String memberId, + int memberEpoch, + String instanceId, + String rackId, + int rebalanceTimeoutMs, + String clientId, + String clientHost, + StreamsGroupHeartbeatRequestData.Topology topology, + List<TaskIds> ownedActiveTasks, + List<TaskIds> ownedStandbyTasks, + List<TaskIds> ownedWarmupTasks, + String processId, + Endpoint userEndpoint, + List<KeyValue> clientTags, + List<TaskOffset> taskOffsets, + List<TaskOffset> taskEndOffsets, + boolean shutdownApplication + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<CoordinatorRecord> records = new ArrayList<>(); + final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = new ArrayList<>(); + + // Get or create the streams group. + boolean isJoining = memberEpoch == 0; + final StreamsGroup group = isJoining ? getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId); + throwIfStreamsGroupIsFull(group, memberId); Review Comment: I think there would be a risk that, in the future, somebody uses `getStreamsGroupOrThrow` in a place where `GroupMaxSizeReachedException` is not expected. It would be a bit unexpected that `getStreamsGroupOrThrow` also does validation that is only needed if we plan to add a member. I think it's better to keep those concerns separate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org