cadonna commented on code in PR #18476:
URL: https://github.com/apache/kafka/pull/18476#discussion_r1925612798


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CurrentAssignmentBuilderTest {
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStable(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(
+                    mkTaskTuple(
+                        taskRole,
+                        mkTasks(subtopologyId1, 1, 2),
+                        mkTasks(subtopologyId2, 3, 4)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,

Review Comment:
   I do not agree. IMO, distinguishing a `10` from an `11` is much harder than 
`memberEpoch` from `memberEpoch + 1` or `10` from `10 + 1` if you like that 
more.
   That about reading. In my comment, I was more referring to documenting that 
we specifically test the case where the member epoch of the updated member is 
greater than the member epoch of the original member and not that the epoch are 
just different.
   I just wanted to explain where I am coming from. I am fine if you want to 
keep it as it is.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes 
the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+    /**
+     * The streams group member which is reconciled.
+     */
+    private final StreamsGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private TaskTuple targetAssignment;
+
+    /**
+     * A function which returns the current process ID of an active task or 
null if the active task
+     * is not assigned. The current process ID is the process ID of the 
current owner.
+     */
+    private BiFunction<String, Integer, String> currentActiveTaskProcessId;
+
+    /**
+     * A function which returns the current process IDs of a standby task or 
null if the standby
+     * task is not assigned. The current process IDs are the process IDs of 
all current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentStandbyTaskProcessIds;
+
+    /**
+     * A function which returns the current process IDs of a warmup task or 
null if the warmup task
+     * is not assigned. The current process IDs are the process IDs of all 
current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentWarmupTaskProcessIds;
+
+    /**
+     * The tasks owned by the member. This may be provided by the member in 
the StreamsGroupHeartbeat request.
+     */
+    private Optional<TaskTuple> ownedTasks = Optional.empty();
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the provided streams group member.
+     *
+     * @param member The streams group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(StreamsGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the 
streams group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(
+        int targetAssignmentEpoch,
+        TaskTuple targetAssignment
+    ) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process ID of an 
active task. This is
+     * used by the state machine to determine if an active task is free or 
still used by another
+     * member, and if there is still a task on a specific process that is not 
yet revoked.
+     *
+     * @param currentActiveTaskProcessId A BiFunction which gets the memberId 
of a subtopology id /
+     *                                   partition id pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(
+        BiFunction<String, Integer, String> currentActiveTaskProcessId
+    ) {
+        this.currentActiveTaskProcessId = 
Objects.requireNonNull(currentActiveTaskProcessId);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
standby task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentStandbyTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology
+     *                                     ids / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
+    ) {
+        this.currentStandbyTaskProcessIds = 
Objects.requireNonNull(currentStandbyTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
warmup task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentWarmupTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology ids
+     *                                    / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds
+    ) {
+        this.currentWarmupTaskProcessIds = 
Objects.requireNonNull(currentWarmupTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets the tasks currently owned by the member. This comes directly from 
the last StreamsGroupHeartbeat request. This is used to
+     * determine if the member has revoked the necessary tasks. Passing null 
into this function means that the member did not provide
+     * its owned tasks in this heartbeat.
+     *
+     * @param ownedAssignment A collection of active, standby and warm-up tasks
+     * @return This object.
+     */
+    protected CurrentAssignmentBuilder withOwnedAssignment(
+        TaskTuple ownedAssignment
+    ) {
+        this.ownedTasks = Optional.ofNullable(ownedAssignment);
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it is 
not possible to move forward with the current state.
+     *
+     * @return A new StreamsGroupMember or the current one.
+     */
+    public StreamsGroupMember build() {
+        switch (member.state()) {
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedTasks()
+                    );
+                } else {
+                    return member;
+                }
+
+            case UNREVOKED_TASKS:
+                // When the member is in the UNREVOKED_TASKS state, we wait
+                // until the member has revoked the necessary tasks. They are
+                // considered revoked when they are not anymore reported in the
+                // owned tasks set in the StreamsGroupHeartbeat API.
+
+                // If the member provides its owned tasks, we verify if it 
still
+                // owns any of the revoked tasks. If it did not provide it's
+                // owned tasks, or we still own some of the revoked tasks, we
+                // cannot progress.
+                if (
+                    ownedTasks.isEmpty() || 
ownedTasks.get().containsAny(member.tasksPendingRevocation())
+                ) {
+                    return member;
+                }
+
+                // When the member has revoked all the pending tasks, it can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,
+                    member.assignedTasks()
+                );
+
+            case UNRELEASED_TASKS:
+                // When the member is in the UNRELEASED_TASKS, we reconcile the
+                // member towards the latest target assignment. This will 
assign any
+                // of the unreleased tasks when they become available.
+                return computeNextAssignment(
+                    member.memberEpoch(),
+                    member.assignedTasks()
+                );
+
+            case UNKNOWN:
+                // We could only end up in this state if a new state is added 
in the
+                // future and the group coordinator is downgraded. In this 
case, the
+                // best option is to fence the member to force it to rejoin 
the group
+                // without any tasks and to reconcile it again from scratch.
+                if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) {

Review Comment:
   Got it!
   If I remember correctly that was also my understanding, but I wanted 
confirmation.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TaskTuple.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable tuple containing active, standby and warm-up tasks.
+ *
+ * @param activeTasks           Active tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param standbyTasks          Standby tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param warmupTasks           Warm-up tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ */
+public record TaskTuple(Map<String, Set<Integer>> activeTasks,

Review Comment:
   I tend to agree. I would call it `TasksTuple`, though, since each position 
within the tuple consists of multiple tasks. But I leave it to you if you want 
to change that.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CurrentAssignmentBuilderTest {
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStable(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(
+                    mkTaskTuple(
+                        taskRole,
+                        mkTasks(subtopologyId1, 1, 2),
+                        mkTasks(subtopologyId2, 3, 4)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
"process")
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)

Review Comment:
   You could at least use variable names like `memberEpoch` and 
`nextMemberEpoch` or similar to make it clearer. Just an idea! Also this I 
leave it to you if you want to keep it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CurrentAssignmentBuilderTest {
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStable(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(
+                    mkTaskTuple(
+                        taskRole,
+                        mkTasks(subtopologyId1, 1, 2),
+                        mkTasks(subtopologyId2, 3, 4)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
"process")
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(
+                    taskRole,
+                    mkTasks(subtopologyId1, 1, 2),
+                    mkTasks(subtopologyId2, 3, 4)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStableWithNewTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.STABLE)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .setTasksPendingRevocation(TaskTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2, 4),
+                mkTasks(subtopologyId2, 3, 4, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 1, 2, 4),
+                    mkTasks(subtopologyId2, 3, 4, 7)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToUnrevokedTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.STABLE)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .setTasksPendingRevocation(TaskTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 4, 5)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId("process")
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 2),
+                    mkTasks(subtopologyId2, 4)))
+                .setTasksPendingRevocation(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 1),
+                    mkTasks(subtopologyId2, 3)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToUnreleasedTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.STABLE)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .setTasksPendingRevocation(TaskTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2, 4),
+                mkTasks(subtopologyId2, 3, 4, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
"process")
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.UNRELEASED_TASKS)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 1, 2),
+                    mkTasks(subtopologyId2, 3, 4)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void 
testStableToUnreleasedTasksWithOwnedTasksNotHavingRevokedTasks(TaskRole 
taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.STABLE)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 4)))
+            .setTasksPendingRevocation(TaskTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1, 2),
+                mkTasks(subtopologyId2, 3, 5)))
+            .withCurrentActiveTaskProcessId((subtopologyId, __) ->
+                subtopologyId2.equals(subtopologyId) ? "process" : null
+            )
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTaskTuple(taskRole))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.UNRELEASED_TASKS)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 1, 2),
+                    mkTasks(subtopologyId2, 3)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToStable(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .setTasksPendingRevocation(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1),
+                mkTasks(subtopologyId2, 4)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(11, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 2, 3),
+                    mkTasks(subtopologyId2, 5, 6)))
+                .setTasksPendingRevocation(TaskTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testRemainsInUnrevokedTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .setTasksPendingRevocation(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1),
+                mkTasks(subtopologyId2, 4)))
+            .build();
+
+        CurrentAssignmentBuilder currentAssignmentBuilder = new 
CurrentAssignmentBuilder(
+            member)
+            .withTargetAssignment(12, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 3),
+                mkTasks(subtopologyId2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet());
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(null)
+                .build()
+        );
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 1, 2, 3),
+                    mkTasks(subtopologyId2, 5, 6)))
+                .build()
+        );
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 2, 3),
+                    mkTasks(subtopologyId2, 4, 5, 6)))
+                .build()
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToUnrevokedTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId("process")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .setTasksPendingRevocation(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 1),
+                mkTasks(subtopologyId2, 4)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(12, mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 3),
+                mkTasks(subtopologyId2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withOwnedAssignment(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder("member")
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId("process")
+                .setMemberEpoch(11)
+                .setPreviousMemberEpoch(10)
+                .setAssignedTasks(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 3),
+                    mkTasks(subtopologyId2, 6)))
+                .setTasksPendingRevocation(mkTaskTuple(taskRole,
+                    mkTasks(subtopologyId1, 2),
+                    mkTasks(subtopologyId2, 5)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToUnreleasedTasks(TaskRole taskRole) {
+        String subtopologyId1 = Uuid.randomUuid().toString();
+        String subtopologyId2 = Uuid.randomUuid().toString();
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId("process")
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setAssignedTasks(mkTaskTuple(taskRole,
+                mkTasks(subtopologyId1, 2, 3),
+                mkTasks(subtopologyId2, 5, 6)))
+            .setTasksPendingRevocation(TaskTuple.EMPTY)

Review Comment:
   Did you forget to add this change? This line is still 
`.setTasksPendingRevocation(TaskTuple.EMPTY)`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TaskTuple.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable tuple containing active, standby and warm-up tasks.
+ *
+ * @param activeTasks           Active tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param standbyTasks          Standby tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param warmupTasks           Warm-up tasks.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ */
+public record TaskTuple(Map<String, Set<Integer>> activeTasks,
+                        Map<String, Set<Integer>> standbyTasks,
+                        Map<String, Set<Integer>> warmupTasks) {
+
+    public TaskTuple {
+        activeTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+        standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));

Review Comment:
   OK



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes 
the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+    /**
+     * The streams group member which is reconciled.
+     */
+    private final StreamsGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private TaskTuple targetAssignment;
+
+    /**
+     * A function which returns the current process ID of an active task or 
null if the active task
+     * is not assigned. The current process ID is the process ID of the 
current owner.
+     */
+    private BiFunction<String, Integer, String> currentActiveTaskProcessId;
+
+    /**
+     * A function which returns the current process IDs of a standby task or 
null if the standby
+     * task is not assigned. The current process IDs are the process IDs of 
all current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentStandbyTaskProcessIds;
+
+    /**
+     * A function which returns the current process IDs of a warmup task or 
null if the warmup task
+     * is not assigned. The current process IDs are the process IDs of all 
current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentWarmupTaskProcessIds;
+
+    /**
+     * The tasks owned by the member. This may be provided by the member in 
the StreamsGroupHeartbeat request.
+     */
+    private Optional<TaskTuple> ownedTasks = Optional.empty();
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the provided streams group member.
+     *
+     * @param member The streams group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(StreamsGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the 
streams group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(
+        int targetAssignmentEpoch,
+        TaskTuple targetAssignment
+    ) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process ID of an 
active task. This is
+     * used by the state machine to determine if an active task is free or 
still used by another
+     * member, and if there is still a task on a specific process that is not 
yet revoked.
+     *
+     * @param currentActiveTaskProcessId A BiFunction which gets the memberId 
of a subtopology id /
+     *                                   partition id pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(
+        BiFunction<String, Integer, String> currentActiveTaskProcessId
+    ) {
+        this.currentActiveTaskProcessId = 
Objects.requireNonNull(currentActiveTaskProcessId);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
standby task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentStandbyTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology
+     *                                     ids / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
+    ) {
+        this.currentStandbyTaskProcessIds = 
Objects.requireNonNull(currentStandbyTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
warmup task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentWarmupTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology ids
+     *                                    / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds
+    ) {
+        this.currentWarmupTaskProcessIds = 
Objects.requireNonNull(currentWarmupTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets the tasks currently owned by the member. This comes directly from 
the last StreamsGroupHeartbeat request. This is used to
+     * determine if the member has revoked the necessary tasks. Passing null 
into this function means that the member did not provide
+     * its owned tasks in this heartbeat.
+     *
+     * @param ownedAssignment A collection of active, standby and warm-up tasks
+     * @return This object.
+     */
+    protected CurrentAssignmentBuilder withOwnedAssignment(
+        TaskTuple ownedAssignment
+    ) {
+        this.ownedTasks = Optional.ofNullable(ownedAssignment);
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it is 
not possible to move forward with the current state.
+     *
+     * @return A new StreamsGroupMember or the current one.
+     */
+    public StreamsGroupMember build() {
+        switch (member.state()) {
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {

Review Comment:
   Good!



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes 
the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+    /**
+     * The streams group member which is reconciled.
+     */
+    private final StreamsGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private TaskTuple targetAssignment;
+
+    /**
+     * A function which returns the current process ID of an active task or 
null if the active task
+     * is not assigned. The current process ID is the process ID of the 
current owner.
+     */
+    private BiFunction<String, Integer, String> currentActiveTaskProcessId;
+
+    /**
+     * A function which returns the current process IDs of a standby task or 
null if the standby
+     * task is not assigned. The current process IDs are the process IDs of 
all current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentStandbyTaskProcessIds;
+
+    /**
+     * A function which returns the current process IDs of a warmup task or 
null if the warmup task
+     * is not assigned. The current process IDs are the process IDs of all 
current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentWarmupTaskProcessIds;
+
+    /**
+     * The tasks owned by the member. This may be provided by the member in 
the StreamsGroupHeartbeat request.
+     */
+    private Optional<TaskTuple> ownedTasks = Optional.empty();
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the provided streams group member.
+     *
+     * @param member The streams group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(StreamsGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the 
streams group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(
+        int targetAssignmentEpoch,
+        TaskTuple targetAssignment
+    ) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process ID of an 
active task. This is
+     * used by the state machine to determine if an active task is free or 
still used by another
+     * member, and if there is still a task on a specific process that is not 
yet revoked.
+     *
+     * @param currentActiveTaskProcessId A BiFunction which gets the memberId 
of a subtopology id /
+     *                                   partition id pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(
+        BiFunction<String, Integer, String> currentActiveTaskProcessId
+    ) {
+        this.currentActiveTaskProcessId = 
Objects.requireNonNull(currentActiveTaskProcessId);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
standby task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentStandbyTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology
+     *                                     ids / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
+    ) {
+        this.currentStandbyTaskProcessIds = 
Objects.requireNonNull(currentStandbyTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
warmup task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentWarmupTaskProcessIds A BiFunction which gets the 
memberIds of a subtopology ids
+     *                                    / partition ids pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds
+    ) {
+        this.currentWarmupTaskProcessIds = 
Objects.requireNonNull(currentWarmupTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets the tasks currently owned by the member. This comes directly from 
the last StreamsGroupHeartbeat request. This is used to
+     * determine if the member has revoked the necessary tasks. Passing null 
into this function means that the member did not provide
+     * its owned tasks in this heartbeat.
+     *
+     * @param ownedAssignment A collection of active, standby and warm-up tasks
+     * @return This object.
+     */
+    protected CurrentAssignmentBuilder withOwnedAssignment(
+        TaskTuple ownedAssignment
+    ) {
+        this.ownedTasks = Optional.ofNullable(ownedAssignment);
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it is 
not possible to move forward with the current state.
+     *
+     * @return A new StreamsGroupMember or the current one.
+     */
+    public StreamsGroupMember build() {
+        switch (member.state()) {
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedTasks()
+                    );
+                } else {
+                    return member;
+                }
+
+            case UNREVOKED_TASKS:
+                // When the member is in the UNREVOKED_TASKS state, we wait
+                // until the member has revoked the necessary tasks. They are
+                // considered revoked when they are not anymore reported in the
+                // owned tasks set in the StreamsGroupHeartbeat API.
+
+                // If the member provides its owned tasks, we verify if it 
still
+                // owns any of the revoked tasks. If it did not provide it's
+                // owned tasks, or we still own some of the revoked tasks, we
+                // cannot progress.
+                if (
+                    ownedTasks.isEmpty() || 
ownedTasks.get().containsAny(member.tasksPendingRevocation())
+                ) {
+                    return member;
+                }
+
+                // When the member has revoked all the pending tasks, it can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,

Review Comment:
   Ah, that makes sense! I really missed something. 
   Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to