cadonna commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1988844236


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1994,287 @@ private 
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
             .toList();
     }
 
+    /**
+     * Handles a regular heartbeat from a Streams group member.
+     * It mainly consists of five parts:
+     * 1) Create or update the member.
+     *    The group epoch is bumped if the member has been created or updated.
+     * 2) Initialize or update the topology.
+     *    The group epoch is bumped if the topology has been created or 
updated.
+     * 3) Determine the partition metadata and any internal topics that need 
to be created.
+     * 4) Update the target assignment for the streams group if the group epoch
+     *    is larger than the current target assignment epoch.
+     * 5) Reconcile the member's assignment with the target assignment.
+     *
+     * @param groupId             The group ID from the request.
+     * @param memberId            The member ID from the request.
+     * @param memberEpoch         The member epoch from the request.
+     * @param instanceId          The instance ID from the request or null.
+     * @param rackId              The rack ID from the request or null.
+     * @param rebalanceTimeoutMs  The rebalance timeout from the request or -1.
+     * @param clientId            The client ID.
+     * @param clientHost          The client host.
+     * @param topology            The topology from the request or null.
+     * @param ownedActiveTasks    The list of owned active tasks from the 
request or null.
+     * @param ownedStandbyTasks   The list of owned standby tasks from the 
request or null.
+     * @param ownedWarmupTasks    The list of owned warmup tasks from the 
request or null.
+     * @param userEndpoint        User-defined endpoint for Interactive 
Queries, or null.
+     * @param clientTags          Used for rack-aware assignment algorithm, or 
null.
+     * @param taskEndOffsets      Cumulative changelog offsets for tasks, or 
null.
+     * @param taskOffsets         Cumulative changelog end-offsets for tasks, 
or null.
+     * @param shutdownApplication Whether all Streams clients in the group 
should shut down.
+     * @return A Result containing the StreamsGroupHeartbeat response and a 
list of records to update the state machine.
+     */
+    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
streamsGroupHeartbeat(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        StreamsGroupHeartbeatRequestData.Topology topology,
+        List<TaskIds> ownedActiveTasks,
+        List<TaskIds> ownedStandbyTasks,
+        List<TaskIds> ownedWarmupTasks,
+        String processId,
+        Endpoint userEndpoint,
+        List<KeyValue> clientTags,
+        List<TaskOffset> taskOffsets,
+        List<TaskOffset> taskEndOffsets,
+        boolean shutdownApplication
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<CoordinatorRecord> records = new ArrayList<>();
+        final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = 
new ArrayList<>();
+
+        // Get or create the streams group.
+        boolean isJoining = memberEpoch == 0;
+        final StreamsGroup group = isJoining ? 
getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId);
+
+        // Get or create the member.
+        StreamsGroupMember member;
+        if (instanceId == null) {
+            member = getOrMaybeCreateDynamicStreamsGroupMember(
+                group,
+                memberId,
+                memberEpoch,
+                ownedActiveTasks,
+                ownedStandbyTasks,
+                ownedWarmupTasks,
+                isJoining
+            );
+        } else {
+            throw new UnsupportedOperationException("Static members are not 
supported yet.");
+        }
+
+        // 1. Create or update the member.
+        StreamsGroupMember updatedMember = new 
StreamsGroupMember.Builder(member)
+            .maybeUpdateInstanceId(Optional.empty())
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
+            .maybeUpdateTopologyEpoch(topology != null ? 
OptionalInt.of(topology.epoch()) : OptionalInt.empty())
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .maybeUpdateProcessId(Optional.ofNullable(processId))
+            .maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x -> 
x.stream().collect(Collectors.toMap(KeyValue::key, KeyValue::value))))
+            .maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x 
-> new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port())))
+            .build();
+
+        // If the member is new or has changed, a 
StreamsGroupMemberMetadataValue record is written to the __consumer_offsets 
partition
+        // to persist the change, and bump the group epoch later.
+        boolean bumpGroupEpoch = hasStreamsMemberMetadataChanged(groupId, 
member, updatedMember, records);
+
+        // 2. Initialize/Update the group topology.
+        // If the topology is new or has changed, a StreamsGroupTopologyValue 
record is written to the __consumer_offsets partition to persist
+        // the change. The group epoch is bumped if the topology has changed.
+        StreamsTopology updatedTopology = maybeUpdateTopology(groupId, 
memberId, topology, group, records);
+        maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
+
+        // 3. Determine the partition metadata and any internal topics if 
needed.
+        ConfiguredTopology updatedConfiguredTopology;
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
updatedPartitionMetadata;
+        boolean reconfigureTopology = group.topology().isEmpty();
+        if (reconfigureTopology || group.hasMetadataExpired(currentTimeMs)) {
+
+            updatedPartitionMetadata = group.computePartitionMetadata(
+                metadataImage.topics(),
+                updatedTopology
+            );
+
+            if (!updatedPartitionMetadata.equals(group.partitionMetadata())) {
+                log.info("[GroupId {}][MemberId {}] Computed new partition 
metadata: {}.",
+                    groupId, memberId, updatedPartitionMetadata);
+                bumpGroupEpoch = true;
+                reconfigureTopology = true;
+                records.add(newStreamsGroupPartitionMetadataRecord(groupId, 
updatedPartitionMetadata));
+                group.setPartitionMetadata(updatedPartitionMetadata);
+            }
+
+            if (reconfigureTopology || group.configuredTopology().isEmpty()) {
+                log.info("[GroupId {}][MemberId {}] Configuring the topology 
{}", groupId, memberId, updatedTopology);
+                updatedConfiguredTopology = 
InternalTopicManager.configureTopics(logContext, updatedTopology, 
updatedPartitionMetadata);
+            } else {
+                updatedConfiguredTopology = group.configuredTopology().get();
+            }
+        } else {
+            updatedConfiguredTopology = group.configuredTopology().get();
+            updatedPartitionMetadata = group.partitionMetadata();
+        }
+
+        // Actually bump the group epoch
+        int groupEpoch = group.groupEpoch();
+        if (bumpGroupEpoch) {
+            groupEpoch += 1;
+            records.add(newStreamsGroupEpochRecord(groupId, groupEpoch));
+            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{}.", groupId, memberId, groupEpoch);
+            metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+            group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
+        }
+
+        // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member.
+        // The delta between the existing and the new target assignment is 
persisted to the partition.
+        int targetAssignmentEpoch;
+        TasksTuple targetAssignment;
+        if (groupEpoch > group.assignmentEpoch()) {
+            targetAssignment = updateStreamsTargetAssignment(
+                group,
+                groupEpoch,
+                updatedMember,
+                updatedConfiguredTopology,
+                updatedPartitionMetadata,
+                records
+            );
+            targetAssignmentEpoch = groupEpoch;
+        } else {
+            targetAssignmentEpoch = group.assignmentEpoch();
+            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
+        }
+
+        // 5. Reconcile the member's assignment with the target assignment if 
the member is not
+        // fully reconciled yet.
+        updatedMember = maybeReconcile(
+            groupId,
+            updatedMember,
+            group::currentActiveTaskProcessId,
+            group::currentStandbyTaskProcessIds,
+            group::currentWarmupTaskProcessIds,
+            targetAssignmentEpoch,
+            targetAssignment,
+            ownedActiveTasks,
+            ownedStandbyTasks,
+            ownedWarmupTasks,
+            records
+        );
+
+        scheduleStreamsGroupSessionTimeout(groupId, memberId);

Review Comment:
   Out of curiosity, why is the session timeout scheduled here and not at the 
very end of the method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to