lucasbru commented on code in PR #18809: URL: https://github.com/apache/kafka/pull/18809#discussion_r1952699451
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3138,6 +3274,45 @@ private void replaceMember( )); } + /** + * Fences a member from a streams group and maybe downgrade the streams group to a classic group. Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -830,6 +844,385 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { verify(groupMetadataManager, times(1)).replay(key, null); } + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3385,6 +3627,49 @@ private void scheduleConsumerGroupRebalanceTimeout( }); } + /** + * Schedules a rebalance timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param memberEpoch The member epoch. + * @param rebalanceTimeoutMs The rebalance timeout. + */ + private void scheduleStreamsGroupRebalanceTimeout( + String groupId, + String memberId, + int memberEpoch, + int rebalanceTimeoutMs + ) { + String key = streamsGroupRebalanceTimeoutKey(groupId, memberId); + timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { + try { + StreamsGroup group = streamsGroup(groupId); + StreamsGroupMember member = group.getOrMaybeCreateMember(memberId, false); + + if (member.memberEpoch() == memberEpoch) { + log.info("[GroupId {}] Member {} fenced from the group because " + + "it failed to transition from epoch {} within {}ms.", + groupId, memberId, memberEpoch, rebalanceTimeoutMs); + + return streamsGroupFenceMember(group, member, null); + } else { + log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " + + "left the epoch {}.", groupId, memberId, memberEpoch); Review Comment: Done. It's not in that epoch anymore, so progressed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3329,6 +3550,27 @@ private void scheduleShareGroupSessionTimeout( ); } + /** + * Schedules (or reschedules) the session timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param sessionTimeoutMs The session timeout. + */ + private void scheduleStreamsGroupSessionTimeout( Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -760,6 +859,43 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( } } + /** + * The method should be called on the replay path. + * Gets or maybe creates a streams group and updates the groups map if a new group is created. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a streams group. + * @throws IllegalStateException if the group does not have the expected type. + * Package private for testing. + */ + StreamsGroup getOrMaybeCreatePersistedStreamsGroup( Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -830,6 +844,385 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { verify(groupMetadataManager, times(1)).replay(key, null); } + @Test + public void testReplayStreamsGroupMetadata() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + StreamsGroupMetadataKey key = new StreamsGroupMetadataKey(); + StreamsGroupMetadataValue value = new StreamsGroupMetadataValue(); + + coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record( + key, + new ApiMessageAndVersion(value, (short) 0) + )); + + verify(groupMetadataManager, times(1)).replay(key, value); Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -687,6 +717,75 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } } + /** + * Gets or maybe creates a streams group without updating the groups map. + * The group will be materialized during the replay. + * + * @param groupId The group id. + * @param createIfNotExists A boolean indicating whether the group should be + * created if it does not exist or is an empty classic group. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a streams group. + * + * Package private for testing. + */ + StreamsGroup getOrMaybeCreateStreamsGroup( + String groupId, + boolean createIfNotExists + ) throws GroupIdNotFoundException { + Group group = groups.get(groupId); + + if (group == null && !createIfNotExists) { + throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId)); + } + + if (group == null) { + return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + } else { + if (group.type() == STREAMS) { + return (StreamsGroup) group; + } else { + throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.", groupId)); + } + } + } + + /** + * Gets a streams group by committed offset. + * + * @param groupId The group id. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A StreamsGroup. + * @throws GroupIdNotFoundException if the group does not exist or is not a streams group. + */ + public StreamsGroup streamsGroup( Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3138,6 +3274,45 @@ private void replaceMember( )); } + /** + * Fences a member from a streams group and maybe downgrade the streams group to a classic group. + * + * @param group The group. + * @param member The member. + * @param response The response of the CoordinatorResult. + * + * @return The CoordinatorResult to be applied. + */ + private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember( + StreamsGroup group, + StreamsGroupMember member, + T response + ) { + List<CoordinatorRecord> records = new ArrayList<>(); + removeStreamsMember(records, group.groupId(), member.memberId()); + + // We bump the group epoch. + int groupEpoch = group.groupEpoch() + 1; + records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch)); + + cancelTimers(group.groupId(), member.memberId()); + + return new CoordinatorResult<>(records, response); + } + + /** + * Write tombstones for the member. The order matters here. + * + * @param records The list of records to append the member assignment tombstone records. + * @param groupId The group id. + * @param memberId The member id. + */ + private void removeStreamsMember(List<CoordinatorRecord> records, String groupId, String memberId) { Review Comment: Refactored, but it might become annoying in heartbeat when we assemble a larger collection of records. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3962,7 +4359,167 @@ public void replay( } removeGroup(groupId); } + } + + /** + * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of + * the streams group. + * It updates the subscription part of the member or deletes the member. + * + * @param key A StreamsGroupMemberMetadataKey key. + * @param value A StreamsGroupMemberMetadataValue record. + */ + public void replay( + StreamsGroupMemberMetadataKey key, + StreamsGroupMemberMetadataValue value + ) { + String groupId = key.groupId(); + String memberId = key.memberId(); + + StreamsGroup streamsGroup; + try { + streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist and a tombstone is replayed, we can ignore it. + return; + } + + if (value != null) { + StreamsGroupMember oldMember = streamsGroup.getOrMaybeCreateMember(memberId, true); + streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember) + .updateWith(value) + .build()); Review Comment: I understand, but I don't think the pattern is too crazy. In the RPC handlers, we don't want to modify the internal state of the group, so we have "getters" and "updaters" and not operations that do both - that's why `getOrMaybeCreate` does not add the member to the group. In a sense, `getOrMaybeCreate` is very much like `getOrDefault`, just with a specific default implementation. That seems like a valid pattern to me, but I agree that one can be mislead into thinking that "creation" will also add the member to the streams group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org