lucasbru commented on code in PR #19114: URL: https://github.com/apache/kafka/pull/19114#discussion_r1987325758
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1805,6 +1990,277 @@ private List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi .toList(); } + /** + * Handles a regular heartbeat from a Streams group member. + * It mainly consists of five parts: + * 1) Created or update the member. + * The group epoch is bumped if the member has been created or updated. + * 2) Initialized or update the topology. The group epoch is bumped if the topology + * has been created or updated. + * 3) Determine the partition metadata and any internal topics that need to be created. + * 4) Update the target assignment for the streams group if the group epoch + * is larger than the current target assignment epoch. + * 5) Reconcile the member's assignment with the target assignment. + * + * @param groupId The group id from the request. + * @param memberId The member ID from the request. + * @param memberEpoch The member epoch from the request. + * @param instanceId The instance ID from the request or null. + * @param rackId The rack ID from the request or null. + * @param rebalanceTimeoutMs The rebalance timeout from the request or -1. + * @param clientId The client ID. + * @param clientHost The client host. + * @param topology The topology from the request or null. + * @param ownedActiveTasks The list of owned active tasks from the request or null. + * @param ownedStandbyTasks The list of owned standby tasks from the request or null. + * @param ownedWarmupTasks The list of owned warmup tasks from the request or null. + * @param userEndpoint User-defined endpoint for Interactive Queries, or null. + * @param clientTags Used for rack-aware assignment algorithm, or null. + * @param taskEndOffsets Cumulative changelog offsets for tasks, or null. + * @param taskOffsets Cumulative changelog end-offsets for tasks, or null. + * @param shutdownApplication Whether all Streams clients in the group should shut down. + * @return A Result containing the StreamsGroupHeartbeat response and a list of records to update the state machine. + */ + private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat( + String groupId, + String memberId, + int memberEpoch, + String instanceId, + String rackId, + int rebalanceTimeoutMs, + String clientId, + String clientHost, + final StreamsGroupHeartbeatRequestData.Topology topology, + List<TaskIds> ownedActiveTasks, + List<TaskIds> ownedStandbyTasks, + List<TaskIds> ownedWarmupTasks, + final String processId, + final Endpoint userEndpoint, + final List<KeyValue> clientTags, + final List<TaskOffset> taskOffsets, + final List<TaskOffset> taskEndOffsets, + final boolean shutdownApplication + ) throws ApiException { + final long currentTimeMs = time.milliseconds(); + final List<CoordinatorRecord> records = new ArrayList<>(); + final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = new ArrayList<>(); + + // Get or create the streams group. + boolean createIfNotExists = memberEpoch == 0; + final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId, createIfNotExists); + + // Get or create the member. + StreamsGroupMember member; + if (instanceId == null) { + member = getOrMaybeSubscribeDynamicStreamsGroupMember( + group, + memberId, + memberEpoch, + ownedActiveTasks, + ownedStandbyTasks, + ownedWarmupTasks, + createIfNotExists + ); + } else { + throw new UnsupportedOperationException("Static members are not supported yet."); + } + + // 1. Create or update the member. + // If the member is new or has changed, a StreamsMemberMetadataValue record is written to the __consumer_offsets partition to + // persist the change. If the subscriptions have changed, the subscription metadata is updated and persisted by writing a + // StreamsPartitionMetadataValue + // record to the __consumer_offsets partition. Finally, the group epoch is bumped if the subscriptions have + // changed, and persisted by writing a StreamsMetadataValue record to the partition. + StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member) + .maybeUpdateInstanceId(Optional.empty()) + .maybeUpdateRackId(Optional.ofNullable(rackId)) + .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs)) + .maybeUpdateTopologyEpoch(topology != null ? OptionalInt.of(topology.epoch()) : OptionalInt.empty()) + .setClientId(clientId) + .setClientHost(clientHost) + .maybeUpdateProcessId(Optional.ofNullable(processId)) + .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x -> x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value)))) + .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x -> new StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port()))) + .build(); + + // If the group is newly created, we must ensure that it moves away from + // epoch 0 and that it is fully initialized. + int groupEpoch = group.groupEpoch(); + boolean bumpGroupEpoch = false; + + bumpGroupEpoch |= hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records); + + // 2. Initialize/Update the group topology. + // If the member is new or has changed, a StreamsGroupTopologyValue record is written to the __consumer_offsets partition to persist + // the change. The group epoch is bumped if the topology has changed. + StreamsTopology updatedTopology; + boolean reconfigureTopology = false; + if (topology != null) { + StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(topology); + + updatedTopology = StreamsTopology.fromRequest(topology); + + if (group.topology().isEmpty()) { + log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", groupId, memberId, topology.epoch()); + + records.add(newStreamsGroupTopologyRecord(groupId, recordValue)); + + reconfigureTopology = true; + bumpGroupEpoch = true; + } else if (!updatedTopology.equals(group.topology().get())) { + throw new InvalidRequestException("Topology updates are not supported yet."); + } + } else if (group.topology().isPresent()) { + updatedTopology = group.topology().get(); + } else { + throw new IllegalStateException("The topology is null and the group topology is also null."); + } + + if (group.topology().isPresent() && updatedMember.topologyEpoch() < group.topology().get().topologyEpoch()) { + returnedStatus.add( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code()) + .setStatusDetail( + String.format( + "The member's topology epoch %d is behind the group's topology epoch %d.", + updatedMember.topologyEpoch(), + group.topology().get().topologyEpoch() + ) + ) + ); + } + + // 3. Determine the partition metadata and any internal topics if needed. + ConfiguredTopology updatedConfiguredTopology; + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> updatedPartitionMetadata; + if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) { + + // The partition metadata is updated when the refresh deadline has been reached. + updatedPartitionMetadata = group.computePartitionMetadata( + metadataImage.topics(), + updatedTopology + ); + + if (!updatedPartitionMetadata.equals(group.partitionMetadata())) { + log.info("[GroupId {}][MemberId {}] Computed new partition metadata: {}.", + groupId, memberId, updatedPartitionMetadata); + bumpGroupEpoch = true; + reconfigureTopology = true; + records.add(newStreamsGroupPartitionMetadataRecord(groupId, updatedPartitionMetadata)); + group.setPartitionMetadata(updatedPartitionMetadata); + } + + if (reconfigureTopology || group.configuredTopology().isEmpty()) { + log.info("[GroupId {}][MemberId {}] Configuring the topology {}", groupId, memberId, updatedTopology); + updatedConfiguredTopology = InternalTopicManager.configureTopics(logContext, updatedTopology, updatedPartitionMetadata); + } else { + updatedConfiguredTopology = group.configuredTopology().get(); + } + } else { + updatedConfiguredTopology = group.configuredTopology().get(); + updatedPartitionMetadata = group.partitionMetadata(); + } + + // Actually bump the group epoch + if (bumpGroupEpoch) { + groupEpoch += 1; + records.add(newStreamsGroupEpochRecord(groupId, groupEpoch)); + log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch); + metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); + group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); + } + + // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member + // replaces an existing static member. + // The delta between the existing and the new target assignment is persisted to the partition. + int targetAssignmentEpoch; + TasksTuple targetAssignment; + if (groupEpoch > group.assignmentEpoch()) { + targetAssignment = updateStreamsTargetAssignment( + group, + groupEpoch, + updatedMember, + updatedConfiguredTopology, + updatedPartitionMetadata, + records + ); + targetAssignmentEpoch = groupEpoch; + } else { + targetAssignmentEpoch = group.assignmentEpoch(); + targetAssignment = group.targetAssignment(updatedMember.memberId()); + } + + // 5. Reconcile the member's assignment with the target assignment if the member is not + // fully reconciled yet. + updatedMember = maybeReconcile( + groupId, + updatedMember, + group::currentActiveTaskProcessId, + group::currentStandbyTaskProcessIds, + group::currentWarmupTaskProcessIds, + targetAssignmentEpoch, + targetAssignment, + ownedActiveTasks, + ownedStandbyTasks, + ownedWarmupTasks, + records + ); + + scheduleStreamsGroupSessionTimeout(groupId, memberId); + + // Prepare the response. + StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() + .setMemberId(updatedMember.memberId()) + .setMemberEpoch(updatedMember.memberEpoch()) + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId)); + + // The assignment is only provided in the following cases: + // 1. The member sent a full request. + // It does so when joining or rejoining the group with zero + // as the member epoch; or on any errors (e.g., timeout). + // We use all the non-optional fields to detect a full request as those must be set in a full request. + // 2. The member's assignment has been updated. + boolean isFullRequest = + rebalanceTimeoutMs != -1 + && ownedActiveTasks != null + && ownedStandbyTasks != null + && ownedWarmupTasks != null + && clientTags != null + && processId != null; Review Comment: It used to be the case, that the client in KIP-848 sometimes sends a full request upon an error, without resetting the epoch to rejoin. But I cannot find that code anymore. You are now the client-side expert, do we do it or not? I removed the extra condition for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org