lucasbru commented on code in PR #18809:
URL: https://github.com/apache/kafka/pull/18809#discussion_r1952699451


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Fences a member from a streams group and maybe downgrade the streams 
group to a classic group.

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
 
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3385,6 +3627,49 @@ private void scheduleConsumerGroupRebalanceTimeout(
         });
     }
 
+    /**
+     * Schedules a rebalance timeout for the member.
+     *
+     * @param groupId               The group id.
+     * @param memberId              The member id.
+     * @param memberEpoch           The member epoch.
+     * @param rebalanceTimeoutMs    The rebalance timeout.
+     */
+    private void scheduleStreamsGroupRebalanceTimeout(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        int rebalanceTimeoutMs
+    ) {
+        String key = streamsGroupRebalanceTimeoutKey(groupId, memberId);
+        timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+            try {
+                StreamsGroup group = streamsGroup(groupId);
+                StreamsGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+
+                if (member.memberEpoch() == memberEpoch) {
+                    log.info("[GroupId {}] Member {} fenced from the group 
because " +
+                            "it failed to transition from epoch {} within 
{}ms.",
+                        groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+
+                    return streamsGroupFenceMember(group, member, null);
+                } else {
+                    log.debug("[GroupId {}] Ignoring rebalance timeout for {} 
because the member " +
+                        "left the epoch {}.", groupId, memberId, memberEpoch);

Review Comment:
   Done. It's not in that epoch anymore, so progressed.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3329,6 +3550,27 @@ private void scheduleShareGroupSessionTimeout(
         );
     }
 
+    /**
+     * Schedules (or reschedules) the session timeout for the member.
+     *
+     * @param groupId           The group id.
+     * @param memberId          The member id.
+     * @param sessionTimeoutMs  The session timeout.
+     */
+    private void scheduleStreamsGroupSessionTimeout(

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -760,6 +859,43 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(
         }
     }
 
+    /**
+     * The method should be called on the replay path.
+     * Gets or maybe creates a streams group and updates the groups map if a 
new group is created.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a streams group.
+     * @throws IllegalStateException    if the group does not have the 
expected type.
+     * Package private for testing.
+     */
+    StreamsGroup getOrMaybeCreatePersistedStreamsGroup(

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -830,6 +844,385 @@ public void 
testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() {
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
 
+    @Test
+    public void testReplayStreamsGroupMetadata() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+        CoordinatorMetricsShard metricsShard = 
mock(CoordinatorMetricsShard.class);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            Time.SYSTEM,
+            new MockCoordinatorTimer<>(Time.SYSTEM),
+            mock(GroupCoordinatorConfig.class),
+            coordinatorMetrics,
+            metricsShard
+        );
+
+        StreamsGroupMetadataKey key = new StreamsGroupMetadataKey();
+        StreamsGroupMetadataValue value = new StreamsGroupMetadataValue();
+
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, 
RecordBatch.NO_PRODUCER_EPOCH, CoordinatorRecord.record(
+            key,
+            new ApiMessageAndVersion(value, (short) 0)
+        ));
+
+        verify(groupMetadataManager, times(1)).replay(key, value);

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -687,6 +717,75 @@ ConsumerGroup getOrMaybeCreateConsumerGroup(
         }
     }
 
+    /**
+     * Gets or maybe creates a streams group without updating the groups map.
+     * The group will be materialized during the replay.
+     *
+     * @param groupId           The group id.
+     * @param createIfNotExists A boolean indicating whether the group should 
be
+     *                          created if it does not exist or is an empty 
classic group.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
+     *                                  if the group is not a streams group.
+     *
+     * Package private for testing.
+     */
+    StreamsGroup getOrMaybeCreateStreamsGroup(
+        String groupId,
+        boolean createIfNotExists
+    ) throws GroupIdNotFoundException {
+        Group group = groups.get(groupId);
+
+        if (group == null && !createIfNotExists) {
+            throw new GroupIdNotFoundException(String.format("Streams group %s 
not found.", groupId));
+        }
+
+        if (group == null) {
+            return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
+        } else {
+            if (group.type() == STREAMS) {
+                return (StreamsGroup) group;
+            } else {
+                throw new GroupIdNotFoundException(String.format("Group %s is 
not a streams group.", groupId));
+            }
+        }
+    }
+    
+    /**
+     * Gets a streams group by committed offset.
+     *
+     * @param groupId           The group id.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A StreamsGroup.
+     * @throws GroupIdNotFoundException if the group does not exist or is not 
a streams group.
+     */
+    public StreamsGroup streamsGroup(

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3138,6 +3274,45 @@ private void replaceMember(
         ));
     }
 
+    /**
+     * Fences a member from a streams group and maybe downgrade the streams 
group to a classic group.
+     *
+     * @param group     The group.
+     * @param member    The member.
+     * @param response  The response of the CoordinatorResult.
+     *
+     * @return The CoordinatorResult to be applied.
+     */
+    private <T> CoordinatorResult<T, CoordinatorRecord> 
streamsGroupFenceMember(
+        StreamsGroup group,
+        StreamsGroupMember member,
+        T response
+    ) {
+        List<CoordinatorRecord> records = new ArrayList<>();
+        removeStreamsMember(records, group.groupId(), member.memberId());
+
+        // We bump the group epoch.
+        int groupEpoch = group.groupEpoch() + 1;
+        records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
+
+        cancelTimers(group.groupId(), member.memberId());
+
+        return new CoordinatorResult<>(records, response);
+    }
+
+    /**
+     * Write tombstones for the member. The order matters here.
+     *
+     * @param records       The list of records to append the member 
assignment tombstone records.
+     * @param groupId       The group id.
+     * @param memberId      The member id.
+     */
+    private void removeStreamsMember(List<CoordinatorRecord> records, String 
groupId, String memberId) {

Review Comment:
   Refactored, but it might become annoying in heartbeat when we assemble a 
larger collection of records.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3962,7 +4359,167 @@ public void replay(
             }
             removeGroup(groupId);
         }
+    }
+
+    /**
+     * Replays StreamsGroupMemberMetadataKey/Value to update the hard state of
+     * the streams group.
+     * It updates the subscription part of the member or deletes the member.
+     *
+     * @param key   A StreamsGroupMemberMetadataKey key.
+     * @param value A StreamsGroupMemberMetadataValue record.
+     */
+    public void replay(
+        StreamsGroupMemberMetadataKey key,
+        StreamsGroupMemberMetadataValue value
+    ) {
+        String groupId = key.groupId();
+        String memberId = key.memberId();
+
+        StreamsGroup streamsGroup;
+        try {
+            streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, 
value != null);
+        } catch (GroupIdNotFoundException ex) {
+            // If the group does not exist and a tombstone is replayed, we can 
ignore it.
+            return;
+        }
+
+        if (value != null) {
+            StreamsGroupMember oldMember = 
streamsGroup.getOrMaybeCreateMember(memberId, true);
+            streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember)
+                .updateWith(value)
+                .build());

Review Comment:
   I understand, but I don't think the pattern is too crazy. In the RPC 
handlers, we don't want to modify the internal state of the group, so we have 
"getters" and "updaters" and not operations that do both - that's why 
`getOrMaybeCreate` does not add the member to the group. In a sense, 
`getOrMaybeCreate` is very much like `getOrDefault`, just with a specific 
default implementation. That seems like a valid pattern to me, but I agree that 
one can be mislead into thinking that "creation" will also add the member to 
the streams group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to