cadonna commented on code in PR #18729:
URL: https://github.com/apache/kafka/pull/18729#discussion_r1941239557


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;

Review Comment:
   The above comment also applies to this data structure.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;

Review Comment:
   Could you please add some more info to the javadoc? This data structure is a 
bit hard to grasp.
   A visualization like the following would help:
   ```
   subtopology -> partition -> memberId
   ```
   I am wondering if using a class like `TaskId` for the key would make the 
code simpler.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;

Review Comment:
   What do you think of renaming this data structures to 
`currrentXTaskToProcessId`? 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The Streams topology.
+     */
+    private final TimelineObject<Optional<StreamsTopology>> topology;
+
+    /**
+     * The configured topology including resolved regular expressions.
+     */
+    private final TimelineObject<Optional<ConfiguredTopology>> 
configuredTopology;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
+     * The metadata refresh time is considered as a soft state (read that it 
is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the metadata refresh deadline is invalidated if the
+     * group epoch does not correspond to the current group epoch. This can 
happen if the metadata refresh deadline is updated after having
+     * refreshed the metadata but the write operation failed. In this case, 
the time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.log = logContext.logger(StreamsGroup.class);
+        this.logContext = logContext;
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetActiveTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetStandbyTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetWarmupTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTaskProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentStandbyTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    public ConfiguredTopology configuredTopology() {
+        return configuredTopology.get().orElse(null);
+    }
+
+    public StreamsTopology topology() {
+        return topology.get().orElse(null);
+    }
+
+    public void setTopology(StreamsTopology topology) {
+        this.topology.set(Optional.of(topology));
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public StreamsGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member id of a static member that matches the given group instance 
id.
+     *
+     * @param groupInstanceId The group instance id.
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member is done via the
+     * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be 
created if it does not exist.
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) {
+            return member;
+        }
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskProcessId(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null && 
newMember.instanceId().isPresent()) {
+            staticMembers.put(newMember.instanceId().get(), 
newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskProcessId(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null && 
oldMember.instanceId().isPresent()) {
+            staticMembers.remove(oldMember.instanceId().get());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not 
exist.
+     */
+    public TasksTuple targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+    }
+
+    /**
+     * @return An immutable map containing all the topic partitions with their 
current member assignments.
+     */
+    public Map<String, Map<Integer, String>> 
invertedTargetActiveTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetActiveTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetStandbyTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetWarmupTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId            The member id.
+     * @param newTargetAssignment The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, TasksTuple 
newTargetAssignment) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+
+    private void updateInvertedTargetActiveTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetActiveTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetStandbyTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetStandbyTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetWarmupTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetWarmupTasksAssignment
+        );
+    }
+
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId            The member Id.
+     * @param oldTargetAssignment The old target assignment.
+     * @param newTargetAssignment The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<String> allSubtopologyIds = new HashSet<>();
+        allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet());
+        allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+            TimelineHashMap<Integer, String> taskPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(taskPartitionAssignment.get(partition))) {
+                    taskPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the 
old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    taskPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (taskPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(subtopologyId);
+            } else {
+                invertedTargetAssignment.put(subtopologyId, 
taskPartitionAssignment);
+            }
+        }
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        targetAssignment.remove(memberId);
+    }
+
+    /**
+     * @return An immutable Map containing all the target assignment keyed by 
member id.
+     */
+    public Map<String, TasksTuple> targetAssignment() {
+        return Collections.unmodifiableMap(targetAssignment);
+    }
+
+    /**
+     * Returns the current processId of a task or null if the task does not 
have one.

Review Comment:
   ```suggestion
        * Returns the current process ID of a task or null if the task does not 
have one.
   ```
   Here and in other places.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;

Review Comment:
   What do you think of renaming this data structures to 
`currrentXTaskToMemberId`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The Streams topology.
+     */
+    private final TimelineObject<Optional<StreamsTopology>> topology;
+
+    /**
+     * The configured topology including resolved regular expressions.
+     */
+    private final TimelineObject<Optional<ConfiguredTopology>> 
configuredTopology;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
+     * The metadata refresh time is considered as a soft state (read that it 
is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the metadata refresh deadline is invalidated if the
+     * group epoch does not correspond to the current group epoch. This can 
happen if the metadata refresh deadline is updated after having
+     * refreshed the metadata but the write operation failed. In this case, 
the time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.log = logContext.logger(StreamsGroup.class);
+        this.logContext = logContext;
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetActiveTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetStandbyTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetWarmupTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTaskProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentStandbyTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    public ConfiguredTopology configuredTopology() {
+        return configuredTopology.get().orElse(null);
+    }
+
+    public StreamsTopology topology() {
+        return topology.get().orElse(null);
+    }
+
+    public void setTopology(StreamsTopology topology) {
+        this.topology.set(Optional.of(topology));
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public StreamsGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member id of a static member that matches the given group instance 
id.
+     *
+     * @param groupInstanceId The group instance id.
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member is done via the
+     * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be 
created if it does not exist.
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) {
+            return member;
+        }
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskProcessId(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null && 
newMember.instanceId().isPresent()) {
+            staticMembers.put(newMember.instanceId().get(), 
newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskProcessId(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null && 
oldMember.instanceId().isPresent()) {
+            staticMembers.remove(oldMember.instanceId().get());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not 
exist.
+     */
+    public TasksTuple targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+    }
+
+    /**
+     * @return An immutable map containing all the topic partitions with their 
current member assignments.
+     */
+    public Map<String, Map<Integer, String>> 
invertedTargetActiveTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetActiveTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetStandbyTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetWarmupTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId            The member id.
+     * @param newTargetAssignment The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, TasksTuple 
newTargetAssignment) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+
+    private void updateInvertedTargetActiveTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetActiveTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetStandbyTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetStandbyTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetWarmupTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetWarmupTasksAssignment
+        );
+    }
+
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId            The member Id.
+     * @param oldTargetAssignment The old target assignment.
+     * @param newTargetAssignment The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<String> allSubtopologyIds = new HashSet<>();
+        allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet());
+        allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+            TimelineHashMap<Integer, String> taskPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(taskPartitionAssignment.get(partition))) {
+                    taskPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the 
old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    taskPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (taskPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(subtopologyId);
+            } else {
+                invertedTargetAssignment.put(subtopologyId, 
taskPartitionAssignment);
+            }
+        }
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        targetAssignment.remove(memberId);
+    }
+
+    /**
+     * @return An immutable Map containing all the target assignment keyed by 
member id.
+     */
+    public Map<String, TasksTuple> targetAssignment() {
+        return Collections.unmodifiableMap(targetAssignment);
+    }
+
+    /**
+     * Returns the current processId of a task or null if the task does not 
have one.
+     *
+     * @param subtopologyId     The topic id.
+     * @param taskId The task id.
+     * @return The processId or null.
+     */
+    public String currentActiveTaskProcessId(
+        String subtopologyId, int taskId
+    ) {
+        Map<Integer, String> tasks = 
currentActiveTaskProcessId.get(subtopologyId);
+        if (tasks == null) {
+            return null;
+        } else {
+            return tasks.getOrDefault(taskId, null);
+        }

Review Comment:
   Does it make sense to change this to `Optional`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The Streams topology.
+     */
+    private final TimelineObject<Optional<StreamsTopology>> topology;
+
+    /**
+     * The configured topology including resolved regular expressions.
+     */
+    private final TimelineObject<Optional<ConfiguredTopology>> 
configuredTopology;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
+     * The metadata refresh time is considered as a soft state (read that it 
is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the metadata refresh deadline is invalidated if the
+     * group epoch does not correspond to the current group epoch. This can 
happen if the metadata refresh deadline is updated after having
+     * refreshed the metadata but the write operation failed. In this case, 
the time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.log = logContext.logger(StreamsGroup.class);
+        this.logContext = logContext;
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetActiveTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetStandbyTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetWarmupTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTaskProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentStandbyTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    public ConfiguredTopology configuredTopology() {
+        return configuredTopology.get().orElse(null);
+    }
+
+    public StreamsTopology topology() {
+        return topology.get().orElse(null);
+    }
+
+    public void setTopology(StreamsTopology topology) {
+        this.topology.set(Optional.of(topology));
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public StreamsGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member id of a static member that matches the given group instance 
id.
+     *
+     * @param groupInstanceId The group instance id.
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member is done via the
+     * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be 
created if it does not exist.
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) {
+            return member;
+        }
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskProcessId(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null && 
newMember.instanceId().isPresent()) {
+            staticMembers.put(newMember.instanceId().get(), 
newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskProcessId(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null && 
oldMember.instanceId().isPresent()) {
+            staticMembers.remove(oldMember.instanceId().get());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not 
exist.
+     */
+    public TasksTuple targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+    }
+
+    /**
+     * @return An immutable map containing all the topic partitions with their 
current member assignments.
+     */
+    public Map<String, Map<Integer, String>> 
invertedTargetActiveTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetActiveTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetStandbyTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetWarmupTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId            The member id.
+     * @param newTargetAssignment The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, TasksTuple 
newTargetAssignment) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+
+    private void updateInvertedTargetActiveTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetActiveTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetStandbyTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetStandbyTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetWarmupTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetWarmupTasksAssignment
+        );
+    }
+
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId            The member Id.
+     * @param oldTargetAssignment The old target assignment.
+     * @param newTargetAssignment The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<String> allSubtopologyIds = new HashSet<>();
+        allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet());
+        allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+            TimelineHashMap<Integer, String> taskPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(taskPartitionAssignment.get(partition))) {
+                    taskPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the 
old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    taskPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (taskPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(subtopologyId);
+            } else {
+                invertedTargetAssignment.put(subtopologyId, 
taskPartitionAssignment);
+            }
+        }
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.

Review Comment:
   ```suggestion
        * @param memberId The member ID.
   ```
   Here and in other java docs.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with their current member 
assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner. When a
+     * member revokes a partition, it removes its process ID from this map. 
When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskProcessIds;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The Streams topology.
+     */
+    private final TimelineObject<Optional<StreamsTopology>> topology;
+
+    /**
+     * The configured topology including resolved regular expressions.
+     */
+    private final TimelineObject<Optional<ConfiguredTopology>> 
configuredTopology;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
+     * The metadata refresh time is considered as a soft state (read that it 
is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the metadata refresh deadline is invalidated if the
+     * group epoch does not correspond to the current group epoch. This can 
happen if the metadata refresh deadline is updated after having
+     * refreshed the metadata but the write operation failed. In this case, 
the time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.log = logContext.logger(StreamsGroup.class);
+        this.logContext = logContext;
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetActiveTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetStandbyTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetWarmupTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTaskProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentStandbyTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTaskProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    public ConfiguredTopology configuredTopology() {
+        return configuredTopology.get().orElse(null);
+    }
+
+    public StreamsTopology topology() {
+        return topology.get().orElse(null);
+    }
+
+    public void setTopology(StreamsTopology topology) {
+        this.topology.set(Optional.of(topology));
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public StreamsGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member id of a static member that matches the given group instance 
id.
+     *
+     * @param groupInstanceId The group instance id.
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member is done via the
+     * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be 
created if it does not exist.
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) {
+            return member;
+        }
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskProcessId(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null && 
newMember.instanceId().isPresent()) {
+            staticMembers.put(newMember.instanceId().get(), 
newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskProcessId(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null && 
oldMember.instanceId().isPresent()) {
+            staticMembers.remove(oldMember.instanceId().get());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not 
exist.
+     */
+    public TasksTuple targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+    }
+
+    /**
+     * @return An immutable map containing all the topic partitions with their 
current member assignments.
+     */
+    public Map<String, Map<Integer, String>> 
invertedTargetActiveTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetActiveTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetStandbyTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetWarmupTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId            The member id.
+     * @param newTargetAssignment The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, TasksTuple 
newTargetAssignment) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new TasksTuple(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+
+    private void updateInvertedTargetActiveTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetActiveTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetStandbyTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetStandbyTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetWarmupTasksAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetWarmupTasksAssignment
+        );
+    }
+
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId            The member Id.
+     * @param oldTargetAssignment The old target assignment.
+     * @param newTargetAssignment The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        TasksTuple oldTargetAssignment,
+        TasksTuple newTargetAssignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<String> allSubtopologyIds = new HashSet<>();
+        allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet());
+        allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+            TimelineHashMap<Integer, String> taskPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(taskPartitionAssignment.get(partition))) {
+                    taskPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the 
old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    taskPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (taskPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(subtopologyId);
+            } else {
+                invertedTargetAssignment.put(subtopologyId, 
taskPartitionAssignment);
+            }
+        }
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY),
+            TasksTuple.EMPTY
+        );
+        targetAssignment.remove(memberId);
+    }
+
+    /**
+     * @return An immutable Map containing all the target assignment keyed by 
member id.

Review Comment:
   ```suggestion
        * @return An immutable map containing all the target assignment keyed 
by member ID.
   ```
   Here and elsewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to