showuon commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r857235521


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1142,107 +982,65 @@ private void removeConnector(String connector) {
         );
     }
 
-    private void updateConfigSnapshot() {
-        when(coordinator.configSnapshot()).thenReturn(configState());
-    }
-
     private ClusterConfigState configState() {
         Map<String, Integer> taskCounts = new HashMap<>(connectors);
-        Map<String, Map<String, String>> connectorConfigs = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> Collections.emptyMap()
-        ));
-        Map<String, TargetState> targetStates = 
taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> TargetState.STARTED
-        ));
+        Map<String, Map<String, String>> connectorConfigs = 
transformValues(taskCounts, c -> Collections.emptyMap());
+        Map<String, TargetState> targetStates = transformValues(taskCounts, c 
-> TargetState.STARTED);
         Map<ConnectorTaskId, Map<String, String>> taskConfigs = 
taskCounts.entrySet().stream()
                 .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> 
new ConnectorTaskId(e.getKey(), i)))
                 .collect(Collectors.toMap(
                         Function.identity(),
                         connectorTaskId -> Collections.emptyMap()
                 ));
         return new ClusterConfigState(
-                offset,
+                16,

Review Comment:
   Could you explain the `16` here? Thanks.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -44,93 +35,49 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-@RunWith(Parameterized.class)
-public class IncrementalCooperativeAssignorTest {
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-
-    @Mock
-    private WorkerCoordinator coordinator;
-
-    @Captor
-    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
-    @Parameters
-    public static Iterable<?> mode() {
-        return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-    }

Review Comment:
   So, we don't need to test these 2 modes now?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -667,17 +706,12 @@ private static <T> Map<String, Collection<T>> 
diff(Map<String, Collection<T>> ba
         return incremental;
     }
 
-    private ConnectorsAndTasks assignment(Map<String, ExtendedWorkerState> 
memberConfigs) {
-        log.debug("Received assignments: {}", memberConfigs);
-        Set<String> connectors = memberConfigs.values()
-                .stream()
-                .flatMap(state -> state.assignment().connectors().stream())
-                .collect(Collectors.toSet());
-        Set<ConnectorTaskId> tasks = memberConfigs.values()
-                .stream()
-                .flatMap(state -> state.assignment().tasks().stream())
-                .collect(Collectors.toSet());
-        return new ConnectorsAndTasks.Builder().with(connectors, 
tasks).build();
+    private ConnectorsAndTasks assignment(Map<String, ConnectorsAndTasks> 
memberAssignments) {
+        log.debug("Received assignments: {}", memberAssignments);
+        return new ConnectorsAndTasks.Builder().with(
+                ConnectUtils.combineCollections(memberAssignments.values(), 
ConnectorsAndTasks::connectors),
+                ConnectUtils.combineCollections(memberAssignments.values(), 
ConnectorsAndTasks::tasks)

Review Comment:
   Nice cleanup



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -298,20 +329,21 @@ protected Map<String, ByteBuffer> 
performTaskAssignment(String leaderId, long ma
         Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                 diff(taskAssignments, currentTaskAssignments);
 
+        previousAssignment = computePreviousAssignment(toRevoke, 
connectorAssignments, taskAssignments, lostAssignments);
+        previousGenerationId = currentGenerationId;
+        previousMembers = memberAssignments.keySet();
+
         log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
         log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
-        coordinator.leaderState(new LeaderState(memberConfigs, 
connectorAssignments, taskAssignments));

Review Comment:
   I'm not 100% confident that the `connectorAssignments` == 
`clusterAssignment.allAssignedConnectors()`, and `taskAssignments` == 
`clusterAssignment.allAssignedTasks()`. Could you help me explain and confirm 
it?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -745,22 +779,108 @@ protected void assignTasks(List<WorkerLoad> 
workerAssignment, Collection<Connect
         }
     }
 
-    private static List<WorkerLoad> workerAssignment(Map<String, 
ExtendedWorkerState> memberConfigs,
+    private static List<WorkerLoad> workerAssignment(Map<String, 
ConnectorsAndTasks> memberAssignments,
                                                      ConnectorsAndTasks 
toExclude) {
         ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
                 .with(new HashSet<>(toExclude.connectors()), new 
HashSet<>(toExclude.tasks()))
                 .build();
 
-        return memberConfigs.entrySet().stream()
+        return memberAssignments.entrySet().stream()
                 .map(e -> new WorkerLoad.Builder(e.getKey()).with(
-                        e.getValue().assignment().connectors().stream()
+                        e.getValue().connectors().stream()
                                 .filter(v -> !ignore.connectors().contains(v))
                                 .collect(Collectors.toList()),
-                        e.getValue().assignment().tasks().stream()
+                        e.getValue().tasks().stream()
                                 .filter(v -> !ignore.tasks().contains(v))
                                 .collect(Collectors.toList())
                         ).build()
                 ).collect(Collectors.toList());
     }
 
+    static class ClusterAssignment {

Review Comment:
   I think we need to override `toString` for `ClusterAssignment`. WDYT?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -344,9 +376,9 @@ private ConnectorsAndTasks 
computePreviousAssignment(Map<String, ConnectorsAndTa
                                                          Map<String, 
Collection<ConnectorTaskId>> taskAssignments,
                                                          ConnectorsAndTasks 
lostAssignments) {
         ConnectorsAndTasks previousAssignment = new 
ConnectorsAndTasks.Builder().with(
-                
connectorAssignments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
-                taskAssignments.values() .stream() 
.flatMap(Collection::stream).collect(Collectors.toSet()))
-                .build();
+                ConnectUtils.combineCollections(connectorAssignments.values()),
+                ConnectUtils.combineCollections(taskAssignments.values())

Review Comment:
   nice cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to