lucasbru commented on code in PR #18476: URL: https://github.com/apache/kafka/pull/18476#discussion_r1922191790
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TaskTuple.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An immutable tuple containing active, standby and warm-up tasks. + * + * @param activeTasks Active tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasks Standby tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasks Warm-up tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +public record TaskTuple(Map<String, Set<Integer>> activeTasks, Review Comment: I considered this, but in the javadoc I saw that this was very confusing. For example ```A new task tuple, containing all active tasks, standby tasks and warm-up tasks from both tuples.``` ```A new [[Tasks]], containing all active tasks, standby tasks and warm-up tasks from both given [[Tasks]].``` I'll keep the tuple name for now. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CurrentAssignmentBuilderTest { + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks( + mkTaskTuple( + taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, Review Comment: Since you are asking - I don't think so. I find calculations in the test harder to read than just constants. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CurrentAssignmentBuilderTest { + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks( + mkTaskTuple( + taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> "process") + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) Review Comment: Since you are asking - I don't think so. I find calculations in the test harder to read than just constants. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CurrentAssignmentBuilderTest { + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks( + mkTaskTuple( + taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> "process") + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple( + taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStableWithNewTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2, 4), + mkTasks(subtopologyId2, 3, 4, 7))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null) + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2, 4), + mkTasks(subtopologyId2, 3, 4, 7))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToUnrevokedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 4, 5))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null) + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2), + mkTasks(subtopologyId2, 4))) + .setTasksPendingRevocation(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1), + mkTasks(subtopologyId2, 3))) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToUnreleasedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2, 4), + mkTasks(subtopologyId2, 3, 4, 7))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> "process") + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.UNRELEASED_TASKS) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToUnreleasedTasksWithOwnedTasksNotHavingRevokedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 4))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3, 5))) + .withCurrentActiveTaskProcessId((subtopologyId, __) -> + subtopologyId2.equals(subtopologyId) ? "process" : null + ) + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .withOwnedAssignment(mkTaskTuple(taskRole)) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.UNRELEASED_TASKS) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2), + mkTasks(subtopologyId2, 3))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testUnrevokedTasksToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .setTasksPendingRevocation(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1), + mkTasks(subtopologyId2, 4))) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(11, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null) + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) + .withOwnedAssignment(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .setTasksPendingRevocation(TaskTuple.EMPTY) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testRemainsInUnrevokedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .setTasksPendingRevocation(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1), + mkTasks(subtopologyId2, 4))) + .build(); + + CurrentAssignmentBuilder currentAssignmentBuilder = new CurrentAssignmentBuilder( + member) + .withTargetAssignment(12, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 3), + mkTasks(subtopologyId2, 6))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null) + .withCurrentStandbyTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()) + .withCurrentWarmupTaskProcessIds( + (subtopologyId, partitionId) -> Collections.emptySet()); + + assertEquals( + member, + currentAssignmentBuilder + .withOwnedAssignment(null) + .build() + ); + + assertEquals( + member, + currentAssignmentBuilder + .withOwnedAssignment(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .build() + ); + + assertEquals( + member, + currentAssignmentBuilder + .withOwnedAssignment(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 4, 5, 6))) + .build() + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testUnrevokedTasksToUnrevokedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .setTasksPendingRevocation(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 1), + mkTasks(subtopologyId2, 4))) + .build(); + + StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(12, mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 3), + mkTasks(subtopologyId2, 6))) + .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null) + .withOwnedAssignment(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .build(); + + assertEquals( + new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 3), + mkTasks(subtopologyId2, 6))) + .setTasksPendingRevocation(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2), + mkTasks(subtopologyId2, 5))) + .build(), + updatedMember + ); + } + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testUnrevokedTasksToUnreleasedTasks(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = new StreamsGroupMember.Builder("member") + .setState(MemberState.UNREVOKED_TASKS) + .setProcessId("process") + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedTasks(mkTaskTuple(taskRole, + mkTasks(subtopologyId1, 2, 3), + mkTasks(subtopologyId2, 5, 6))) + .setTasksPendingRevocation(TaskTuple.EMPTY) Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java: ########## @@ -17,37 +17,41 @@ package org.apache.kafka.coordinator.group.streams; import java.util.AbstractMap; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; public class TaskAssignmentTestUtil { - public static Assignment mkAssignment(final Map<String, Set<Integer>> activeTasks, - final Map<String, Set<Integer>> standbyTasks, - final Map<String, Set<Integer>> warmupTasks) { - return new Assignment( - Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)), - Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)), - Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)) - ); + public enum TaskRole { + ACTIVE, + STANDBY, + WARMUP + } + + @SafeVarargs + public static TaskTuple mkTaskTuple(TaskRole taskRole, Map.Entry<String, Set<Integer>>... entries) { + return switch (taskRole) { + case ACTIVE -> new TaskTuple(mkTasksPerSubtopology(entries), new HashMap<>(), new HashMap<>()); + case STANDBY -> new TaskTuple(new HashMap<>(), mkTasksPerSubtopology(entries), new HashMap<>()); + case WARMUP -> new TaskTuple(new HashMap<>(), new HashMap<>(), mkTasksPerSubtopology(entries)); + }; } - public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId, - Integer... tasks) { + public static Map.Entry<String, Set<Integer>> mkTasks( + String subtopologyId, + Integer... tasks + ) { Review Comment: Not intentional, I just do it intuitively because I like it better. Most classes in the group coordinator seem use this style, except for the ones you wrote. Anyway, reverted. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / + * partition id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentActiveTaskProcessId( + BiFunction<String, Integer, String> currentActiveTaskProcessId + ) { + this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentStandbyTaskProcessIds A BiFunction which gets the memberIds of a subtopology + * ids / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds + ) { + this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentWarmupTaskProcessIds A BiFunction which gets the memberIds of a subtopology ids + * / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds + ) { + this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds); + return this; + } + + /** + * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to + * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide + * its owned tasks in this heartbeat. + * + * @param ownedAssignment A collection of active, standby and warm-up tasks + * @return This object. + */ + protected CurrentAssignmentBuilder withOwnedAssignment( + TaskTuple ownedAssignment + ) { + this.ownedTasks = Optional.ofNullable(ownedAssignment); + return this; + } + + /** + * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state. + * + * @return A new StreamsGroupMember or the current one. + */ + public StreamsGroupMember build() { + switch (member.state()) { + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { Review Comment: No, this can be checked centrally when the heartbeat comes in and not again in every class. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TaskTuple.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An immutable tuple containing active, standby and warm-up tasks. + * + * @param activeTasks Active tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasks Standby tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasks Warm-up tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +public record TaskTuple(Map<String, Set<Integer>> activeTasks, + Map<String, Set<Integer>> standbyTasks, + Map<String, Set<Integer>> warmupTasks) { + + public TaskTuple { + activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); + standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); + warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); + } + + /** + * An empty task tuple. + */ + public static final TaskTuple EMPTY = new TaskTuple( + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + /** + * @return true if all collections in the tuple are empty. + */ + public boolean isEmpty() { + return activeTasks.isEmpty() && standbyTasks.isEmpty() && warmupTasks.isEmpty(); + } + + /** + * Merges this task tuple with another task tuple. + * + * @param other The other task tuple. + * @return A new task tuple, containing all active tasks, standby tasks and warm-up tasks from both tuples. + */ + public TaskTuple merge(TaskTuple other) { + Map<String, Set<Integer>> mergedActiveTasks = merge(activeTasks, other.activeTasks); + Map<String, Set<Integer>> mergedStandbyTasks = merge(standbyTasks, other.standbyTasks); + Map<String, Set<Integer>> mergedWarmupTasks = merge(warmupTasks, other.warmupTasks); + return new TaskTuple(mergedActiveTasks, mergedStandbyTasks, mergedWarmupTasks); + } + + private Map<String, Set<Integer>> merge(final Map<String, Set<Integer>> tasks1, final Map<String, Set<Integer>> tasks2) { Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TaskTuple.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An immutable tuple containing active, standby and warm-up tasks. + * + * @param activeTasks Active tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param standbyTasks Standby tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + * @param warmupTasks Warm-up tasks. + * The key of the map is the subtopology ID and the value is the set of partition IDs. + */ +public record TaskTuple(Map<String, Set<Integer>> activeTasks, + Map<String, Set<Integer>> standbyTasks, + Map<String, Set<Integer>> warmupTasks) { + + public TaskTuple { + activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); + standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); + warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); Review Comment: Yes ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / + * partition id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentActiveTaskProcessId( + BiFunction<String, Integer, String> currentActiveTaskProcessId + ) { + this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentStandbyTaskProcessIds A BiFunction which gets the memberIds of a subtopology + * ids / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds + ) { + this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentWarmupTaskProcessIds A BiFunction which gets the memberIds of a subtopology ids + * / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds + ) { + this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds); + return this; + } + + /** + * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to + * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide + * its owned tasks in this heartbeat. + * + * @param ownedAssignment A collection of active, standby and warm-up tasks + * @return This object. + */ + protected CurrentAssignmentBuilder withOwnedAssignment( + TaskTuple ownedAssignment + ) { + this.ownedTasks = Optional.ofNullable(ownedAssignment); + return this; + } + + /** + * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state. + * + * @return A new StreamsGroupMember or the current one. + */ + public StreamsGroupMember build() { + switch (member.state()) { + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + } else { + return member; + } + + case UNREVOKED_TASKS: + // When the member is in the UNREVOKED_TASKS state, we wait + // until the member has revoked the necessary tasks. They are + // considered revoked when they are not anymore reported in the + // owned tasks set in the StreamsGroupHeartbeat API. + + // If the member provides its owned tasks, we verify if it still + // owns any of the revoked tasks. If it did not provide it's + // owned tasks, or we still own some of the revoked tasks, we + // cannot progress. + if ( + ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation()) + ) { + return member; + } + + // When the member has revoked all the pending tasks, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, + member.assignedTasks() + ); + + case UNRELEASED_TASKS: + // When the member is in the UNRELEASED_TASKS, we reconcile the + // member towards the latest target assignment. This will assign any + // of the unreleased tasks when they become available. + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + + case UNKNOWN: + // We could only end up in this state if a new state is added in the + // future and the group coordinator is downgraded. In this case, the + // best option is to fence the member to force it to rejoin the group + // without any tasks and to reconcile it again from scratch. + if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) { Review Comment: If we know that the client does not own any tasks (and this is the only case where we know it, because we do not store the owned tasks), we already have what the rejoin is aiming to achieve (no owned tasks), so there is no point in fencing. This comes from the original current assignment builder. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CurrentAssignmentBuilderTest { + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); + + StreamsGroupMember member = + new StreamsGroupMember.Builder("member") + .setState(MemberState.STABLE) + .setProcessId("process") Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / + * partition id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentActiveTaskProcessId( + BiFunction<String, Integer, String> currentActiveTaskProcessId + ) { + this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentStandbyTaskProcessIds A BiFunction which gets the memberIds of a subtopology + * ids / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds + ) { + this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentWarmupTaskProcessIds A BiFunction which gets the memberIds of a subtopology ids + * / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds + ) { + this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds); + return this; + } + + /** + * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to + * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide + * its owned tasks in this heartbeat. + * + * @param ownedAssignment A collection of active, standby and warm-up tasks + * @return This object. + */ + protected CurrentAssignmentBuilder withOwnedAssignment( + TaskTuple ownedAssignment + ) { + this.ownedTasks = Optional.ofNullable(ownedAssignment); + return this; + } + + /** + * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state. + * + * @return A new StreamsGroupMember or the current one. + */ + public StreamsGroupMember build() { + switch (member.state()) { + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + } else { + return member; + } + + case UNREVOKED_TASKS: + // When the member is in the UNREVOKED_TASKS state, we wait + // until the member has revoked the necessary tasks. They are + // considered revoked when they are not anymore reported in the + // owned tasks set in the StreamsGroupHeartbeat API. + + // If the member provides its owned tasks, we verify if it still + // owns any of the revoked tasks. If it did not provide it's + // owned tasks, or we still own some of the revoked tasks, we + // cannot progress. + if ( + ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation()) + ) { + return member; + } + + // When the member has revoked all the pending tasks, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, + member.assignedTasks() + ); + + case UNRELEASED_TASKS: + // When the member is in the UNRELEASED_TASKS, we reconcile the + // member towards the latest target assignment. This will assign any + // of the unreleased tasks when they become available. + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + + case UNKNOWN: + // We could only end up in this state if a new state is added in the + // future and the group coordinator is downgraded. In this case, the + // best option is to fence the member to force it to rejoin the group + // without any tasks and to reconcile it again from scratch. + if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) { + throw new FencedMemberEpochException( + "The streams group member is in a unknown state. " + + "The member must abandon all its tasks and rejoin."); + } + + return computeNextAssignment( + targetAssignmentEpoch, + member.assignedTasks() + ); + } + + return member; + } + + /** + * Takes the current currentAssignment and the targetAssignment, and generates three + * collections: + * + * - the resultAssignedTasks: the tasks that are assigned in both the current and target + * assignments. + * - the resultTasksPendingRevocation: the tasks that are assigned in the current + * assignment but not in the target assignment. + * - the resultTasksPendingAssignment: the tasks that are assigned in the target assignment but + * not in the current assignment, and can be assigned currently (i.e., they are not owned by + * another member, as defined by the `isUnreleasedTask` predicate). + */ + private boolean computeAssignmentDifference( + Map<String, Set<Integer>> currentAssignment, + Map<String, Set<Integer>> targetAssignment, + Map<String, Set<Integer>> resultAssignedTasks, + Map<String, Set<Integer>> resultTasksPendingRevocation, + Map<String, Set<Integer>> resultTasksPendingAssignment, + BiPredicate<String, Integer> isUnreleasedTask + ) { + boolean hasUnreleasedTasks = false; + + Set<String> allSubtopologyIds = new HashSet<>(targetAssignment.keySet()); + allSubtopologyIds.addAll(currentAssignment.keySet()); + + for (String subtopologyId : allSubtopologyIds) { + Set<Integer> currentPartitions = currentAssignment.getOrDefault(subtopologyId, Collections.emptySet()); + Set<Integer> targetPartitions = targetAssignment.getOrDefault(subtopologyId, Collections.emptySet()); + + // Result Assigned Partitions = Current Partitions ∩ Target Partitions + // i.e. we remove all partitions from the current assignment that are not in the target + // assignment + Set<Integer> resultAssignedPartitions = new HashSet<>(currentPartitions); + resultAssignedPartitions.retainAll(targetPartitions); + + // Result Partitions Pending Revocation = Current Partitions - Result Assigned Partitions + // i.e. we will ask the member to revoke all partitions in its current assignment that + // are not in the target assignment + Set<Integer> resultPartitionsPendingRevocation = new HashSet<>(currentPartitions); + resultPartitionsPendingRevocation.removeAll(resultAssignedPartitions); + + // Result Partitions Pending Assignment = Target Partitions - Result Assigned Partitions - Unreleased Partitions + // i.e. we will ask the member to assign all partitions in its target assignment, + // except those that are already assigned, and those that are unrelead Review Comment: Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Collections; + +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskTuple; +import static org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CurrentAssignmentBuilderTest { + + @ParameterizedTest + @EnumSource(TaskRole.class) + public void testStableToStable(TaskRole taskRole) { + String subtopologyId1 = Uuid.randomUuid().toString(); + String subtopologyId2 = Uuid.randomUuid().toString(); Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / + * partition id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentActiveTaskProcessId( + BiFunction<String, Integer, String> currentActiveTaskProcessId + ) { + this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentStandbyTaskProcessIds A BiFunction which gets the memberIds of a subtopology + * ids / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds + ) { + this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentWarmupTaskProcessIds A BiFunction which gets the memberIds of a subtopology ids + * / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds + ) { + this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds); + return this; + } + + /** + * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to + * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide + * its owned tasks in this heartbeat. + * + * @param ownedAssignment A collection of active, standby and warm-up tasks + * @return This object. + */ + protected CurrentAssignmentBuilder withOwnedAssignment( + TaskTuple ownedAssignment + ) { + this.ownedTasks = Optional.ofNullable(ownedAssignment); + return this; + } + + /** + * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state. + * + * @return A new StreamsGroupMember or the current one. + */ + public StreamsGroupMember build() { + switch (member.state()) { + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + } else { + return member; + } + + case UNREVOKED_TASKS: + // When the member is in the UNREVOKED_TASKS state, we wait + // until the member has revoked the necessary tasks. They are + // considered revoked when they are not anymore reported in the + // owned tasks set in the StreamsGroupHeartbeat API. + + // If the member provides its owned tasks, we verify if it still + // owns any of the revoked tasks. If it did not provide it's + // owned tasks, or we still own some of the revoked tasks, we + // cannot progress. + if ( + ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation()) + ) { + return member; + } + + // When the member has revoked all the pending tasks, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, Review Comment: Good comment! Interesting idea, but I think you are missing something. See, whether `member.tasksPendingRevocation` was defined in a previous heartbeat, which may have had a different target assignment, than the latest assignment. So here, all we know is that we can transition to the next target assignment, because we revoked all tasks that we need to according to that previous target assignment. This doesn't imply much about what will go on in `buildNewMember`, because we are using a new target assignment, which may require other tasks to be revoked. So if we find out, that we actually need to revoke more tasks to reach the target assignment, we will use the result of `memember.meberEpoch() + 1`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java: ########## @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.errors.FencedMemberEpochException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a + * member and a desired or target assignment state, the state machine takes the necessary steps to converge them. + */ +public class CurrentAssignmentBuilder { + + /** + * The streams group member which is reconciled. + */ + private final StreamsGroupMember member; + + /** + * The target assignment epoch. + */ + private int targetAssignmentEpoch; + + /** + * The target assignment. + */ + private TaskTuple targetAssignment; + + /** + * A function which returns the current process ID of an active task or null if the active task + * is not assigned. The current process ID is the process ID of the current owner. + */ + private BiFunction<String, Integer, String> currentActiveTaskProcessId; + + /** + * A function which returns the current process IDs of a standby task or null if the standby + * task is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds; + + /** + * A function which returns the current process IDs of a warmup task or null if the warmup task + * is not assigned. The current process IDs are the process IDs of all current owners. + */ + private BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds; + + /** + * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request. + */ + private Optional<TaskTuple> ownedTasks = Optional.empty(); + + /** + * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member. + * + * @param member The streams group member that must be reconciled. + */ + public CurrentAssignmentBuilder(StreamsGroupMember member) { + this.member = Objects.requireNonNull(member); + } + + /** + * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to. + * + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @return This object. + */ + public CurrentAssignmentBuilder withTargetAssignment( + int targetAssignmentEpoch, + TaskTuple targetAssignment + ) { + this.targetAssignmentEpoch = targetAssignmentEpoch; + this.targetAssignment = Objects.requireNonNull(targetAssignment); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is + * used by the state machine to determine if an active task is free or still used by another + * member, and if there is still a task on a specific process that is not yet revoked. + * + * @param currentActiveTaskProcessId A BiFunction which gets the memberId of a subtopology id / + * partition id pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentActiveTaskProcessId( + BiFunction<String, Integer, String> currentActiveTaskProcessId + ) { + this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentStandbyTaskProcessIds A BiFunction which gets the memberIds of a subtopology + * ids / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds + ) { + this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds); + return this; + } + + /** + * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is + * used by the state machine to determine if there is still a task on a specific process that is + * not yet revoked. + * + * @param currentWarmupTaskProcessIds A BiFunction which gets the memberIds of a subtopology ids + * / partition ids pair. + * @return This object. + */ + public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds( + BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds + ) { + this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds); + return this; + } + + /** + * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to + * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide + * its owned tasks in this heartbeat. + * + * @param ownedAssignment A collection of active, standby and warm-up tasks + * @return This object. + */ + protected CurrentAssignmentBuilder withOwnedAssignment( + TaskTuple ownedAssignment + ) { + this.ownedTasks = Optional.ofNullable(ownedAssignment); + return this; + } + + /** + * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state. + * + * @return A new StreamsGroupMember or the current one. + */ + public StreamsGroupMember build() { + switch (member.state()) { + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + } else { + return member; + } + + case UNREVOKED_TASKS: + // When the member is in the UNREVOKED_TASKS state, we wait + // until the member has revoked the necessary tasks. They are + // considered revoked when they are not anymore reported in the + // owned tasks set in the StreamsGroupHeartbeat API. + + // If the member provides its owned tasks, we verify if it still + // owns any of the revoked tasks. If it did not provide it's + // owned tasks, or we still own some of the revoked tasks, we + // cannot progress. + if ( + ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation()) + ) { + return member; + } + + // When the member has revoked all the pending tasks, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, + member.assignedTasks() + ); + + case UNRELEASED_TASKS: + // When the member is in the UNRELEASED_TASKS, we reconcile the + // member towards the latest target assignment. This will assign any + // of the unreleased tasks when they become available. + return computeNextAssignment( + member.memberEpoch(), + member.assignedTasks() + ); + + case UNKNOWN: + // We could only end up in this state if a new state is added in the + // future and the group coordinator is downgraded. In this case, the + // best option is to fence the member to force it to rejoin the group + // without any tasks and to reconcile it again from scratch. + if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) { + throw new FencedMemberEpochException( + "The streams group member is in a unknown state. " + + "The member must abandon all its tasks and rejoin."); + } + + return computeNextAssignment( + targetAssignmentEpoch, + member.assignedTasks() + ); + } + + return member; + } + + /** + * Takes the current currentAssignment and the targetAssignment, and generates three + * collections: + * + * - the resultAssignedTasks: the tasks that are assigned in both the current and target + * assignments. + * - the resultTasksPendingRevocation: the tasks that are assigned in the current + * assignment but not in the target assignment. + * - the resultTasksPendingAssignment: the tasks that are assigned in the target assignment but + * not in the current assignment, and can be assigned currently (i.e., they are not owned by + * another member, as defined by the `isUnreleasedTask` predicate). + */ + private boolean computeAssignmentDifference( + Map<String, Set<Integer>> currentAssignment, + Map<String, Set<Integer>> targetAssignment, + Map<String, Set<Integer>> resultAssignedTasks, + Map<String, Set<Integer>> resultTasksPendingRevocation, + Map<String, Set<Integer>> resultTasksPendingAssignment, + BiPredicate<String, Integer> isUnreleasedTask + ) { + boolean hasUnreleasedTasks = false; + + Set<String> allSubtopologyIds = new HashSet<>(targetAssignment.keySet()); + allSubtopologyIds.addAll(currentAssignment.keySet()); + + for (String subtopologyId : allSubtopologyIds) { + Set<Integer> currentPartitions = currentAssignment.getOrDefault(subtopologyId, Collections.emptySet()); Review Comment: I did that on purpose. We'll otherwise call (String, Integer) a task and (Integer) a task. I found that very confusing. Also in the `TaskId` class, we call the integer the partition. But if you think my solution is more confusing, we can also call (String, Integer) and Integer both a task. We'll get naming collisions - `resultAssignedTasks` for all subtopologies vs. `resultAssignedTasks` for one subtopology. I appended `ForThisSubtopology` to all the collections to make it compile. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org