mimaison commented on code in PR #11974: URL: https://github.com/apache/kafka/pull/11974#discussion_r841936725
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -34,25 +34,30 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; Review Comment: Could we remove hamcrest now and only use junit? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1275,89 +1136,68 @@ private static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { int connectorNumEnd = connectorStart + connectorNum - 1; + + Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> taskNum); + Map<String, Map<String, String>> connectorConfigs = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new); + Map<String, TargetState> connectorTargetStates = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> TargetState.STARTED); + Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap( + 0, + connectorNum * taskNum, + i -> new ConnectorTaskId("connector" + i / connectorNum + 1, i), + HashMap::new + ); + return new ClusterConfigState( offset, null, - connectorTaskCounts(connectorStart, connectorNumEnd, taskNum), - connectorConfigs(connectorStart, connectorNumEnd), - connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED), - taskConfigs(0, connectorNum, connectorNum * taskNum), + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, Collections.emptySet()); } - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, - long givenOffset, - Map<String, ExtendedAssignment> givenAssignments) { - return givenAssignments.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue()))); - } - - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, + private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, int start, int connectorNum) { Review Comment: Should this be `workerNum` instead of `connectorNum`? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -34,25 +34,30 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.runners.Parameterized.Parameter; Review Comment: Not related to changes in this PR. I see we are defining a test parameter with `mode()` but the class is not annotated with `@RunWith(value = Parameterized.class)` so `protocolVersion` is never initialized and always set to 0. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ########## @@ -1275,89 +1136,68 @@ private static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { int connectorNumEnd = connectorStart + connectorNum - 1; + + Map<String, Integer> connectorTaskCounts = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> taskNum); + Map<String, Map<String, String>> connectorConfigs = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, HashMap::new); + Map<String, TargetState> connectorTargetStates = fillMap(connectorStart, connectorNumEnd, i -> "connector" + i, () -> TargetState.STARTED); + Map<ConnectorTaskId, Map<String, String>> taskConfigs = fillMap( + 0, + connectorNum * taskNum, + i -> new ConnectorTaskId("connector" + i / connectorNum + 1, i), + HashMap::new + ); + return new ClusterConfigState( offset, null, - connectorTaskCounts(connectorStart, connectorNumEnd, taskNum), - connectorConfigs(connectorStart, connectorNumEnd), - connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED), - taskConfigs(0, connectorNum, connectorNum * taskNum), + connectorTaskCounts, + connectorConfigs, + connectorTargetStates, + taskConfigs, Collections.emptySet()); } - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, - long givenOffset, - Map<String, ExtendedAssignment> givenAssignments) { - return givenAssignments.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue()))); - } - - private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, + private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, int start, int connectorNum) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null))) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, Integer> connectorTaskCounts(int start, - int connectorNum, - int taskCounts) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts)) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, Map<String, String>> connectorConfigs(int start, int connectorNum) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, new HashMap<String, String>())) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); - } - - private static Map<String, TargetState> connectorTargetStates(int start, - int connectorNum, - TargetState state) { - return IntStream.range(start, connectorNum + 1) - .mapToObj(i -> new SimpleEntry<>("connector" + i, state)) - .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); + return fillMap( Review Comment: I wonder if building this map explicitly may be more readable. What about something like: ``` private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, String ... workers) ``` Then you call it with `memberConfigs(leader, offset, W0, W1)`. It's slightly less flexible, but since all tests only use a few workers I think this could make the code a bit simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org