mimaison commented on code in PR #11974:
URL: https://github.com/apache/kafka/pull/11974#discussion_r841936725


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -34,25 +34,30 @@
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
-import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;

Review Comment:
   Could we remove hamcrest now and only use junit?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1275,89 +1136,68 @@ private static ClusterConfigState 
clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
         int connectorNumEnd = connectorStart + connectorNum - 1;
+
+        Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, 
connectorNumEnd, i -> "connector" + i, () -> taskNum);
+        Map<String, Map<String, String>> connectorConfigs = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new);
+        Map<String, TargetState> connectorTargetStates = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> 
TargetState.STARTED);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap(
+                0,
+                connectorNum * taskNum,
+                i -> new ConnectorTaskId("connector" + i / connectorNum + 1, 
i),
+                HashMap::new
+        );
+
         return new ClusterConfigState(
                 offset,
                 null,
-                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
-                connectorConfigs(connectorStart, connectorNumEnd),
-                connectorTargetStates(connectorStart, connectorNumEnd, 
TargetState.STARTED),
-                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
                 Collections.emptySet());
     }
 
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                  long 
givenOffset,
-                                                                  Map<String, 
ExtendedAssignment> givenAssignments) {
-        return givenAssignments.entrySet().stream()
-                .collect(Collectors.toMap(
-                    Map.Entry::getKey,
-                    e -> new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, 
e.getValue())));
-    }
-
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
+    private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
                                                                   long 
givenOffset,
                                                                   int start,
                                                                   int 
connectorNum) {

Review Comment:
   Should this be `workerNum` instead of `connectorNum`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -34,25 +34,30 @@
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static 
org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
-import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.runners.Parameterized.Parameter;

Review Comment:
   Not related to changes in this PR. I see we are defining a test parameter 
with `mode()` but the class is not annotated with `@RunWith(value = 
Parameterized.class)` so `protocolVersion` is never initialized and always set 
to 0.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java:
##########
@@ -1275,89 +1136,68 @@ private static ClusterConfigState 
clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
         int connectorNumEnd = connectorStart + connectorNum - 1;
+
+        Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, 
connectorNumEnd, i -> "connector" + i, () -> taskNum);
+        Map<String, Map<String, String>> connectorConfigs = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new);
+        Map<String, TargetState> connectorTargetStates = 
fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> 
TargetState.STARTED);
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap(
+                0,
+                connectorNum * taskNum,
+                i -> new ConnectorTaskId("connector" + i / connectorNum + 1, 
i),
+                HashMap::new
+        );
+
         return new ClusterConfigState(
                 offset,
                 null,
-                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
-                connectorConfigs(connectorStart, connectorNumEnd),
-                connectorTargetStates(connectorStart, connectorNumEnd, 
TargetState.STARTED),
-                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                connectorTaskCounts,
+                connectorConfigs,
+                connectorTargetStates,
+                taskConfigs,
                 Collections.emptySet());
     }
 
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
-                                                                  long 
givenOffset,
-                                                                  Map<String, 
ExtendedAssignment> givenAssignments) {
-        return givenAssignments.entrySet().stream()
-                .collect(Collectors.toMap(
-                    Map.Entry::getKey,
-                    e -> new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, 
e.getValue())));
-    }
-
-    private static Map<String, ExtendedWorkerState> memberConfigs(String 
givenLeader,
+    private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
                                                                   long 
givenOffset,
                                                                   int start,
                                                                   int 
connectorNum) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("worker" + i, new 
ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, Integer> connectorTaskCounts(int start,
-                                                            int connectorNum,
-                                                            int taskCounts) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, Map<String, String>> connectorConfigs(int 
start, int connectorNum) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, new 
HashMap<String, String>()))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
-    }
-
-    private static Map<String, TargetState> connectorTargetStates(int start,
-                                                                  int 
connectorNum,
-                                                                  TargetState 
state) {
-        return IntStream.range(start, connectorNum + 1)
-                .mapToObj(i -> new SimpleEntry<>("connector" + i, state))
-                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
+        return fillMap(

Review Comment:
   I wonder if building this map explicitly may be more readable. What about 
something like:
   ```
   private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, 
long givenOffset, String ... workers)
   ```
   
   Then you call it with `memberConfigs(leader, offset, W0, W1)`. It's slightly 
less flexible, but since all tests only use a few workers I think this could 
make the code a bit simpler.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to