mjsax commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ########## @@ -45,75 +81,162 @@ private static final class Harness { private final Map<UUID, ClientState> droppedClientStates; private final StringBuilder history = new StringBuilder(); + public final Map<TaskId, Set<TopicPartition>> partitionsForTask; + public final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask; + public final Map<Subtopology, Set<TaskId>> tasksForTopicGroup; + public final Cluster fullMetadata; + public final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer; + public final InternalTopicManager internalTopicManager; + private static Harness initializeCluster(final int numStatelessTasks, final int numStatefulTasks, - final int numNodes, - final Supplier<Integer> partitionCountSupplier) { + final int numClients, + final Supplier<Integer> partitionCountSupplier, + final int numNodes) { int subtopology = 0; final Set<TaskId> statelessTasks = new TreeSet<>(); int remainingStatelessTasks = numStatelessTasks; + final List<Node> nodes = getRandomNodes(numNodes); + int nodeIndex = 0; + final Set<PartitionInfo> partitionInfoSet = new HashSet<>(); + final Map<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>(); + final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = new HashMap<>(); + final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<>(); + while (remainingStatelessTasks > 0) { final int partitions = Math.min(remainingStatelessTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { - statelessTasks.add(new TaskId(subtopology, i)); + final TaskId taskId = new TaskId(subtopology, i); + statelessTasks.add(taskId); remainingStatelessTasks--; + + final Node[] replica = getRandomReplica(nodes, nodeIndex); + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); + nodeIndex++; + + partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); + tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); } subtopology++; } final Map<TaskId, Long> statefulTaskEndOffsetSums = new TreeMap<>(); + final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>(); + final Set<String> changelogNames = new HashSet<>(); int remainingStatefulTasks = numStatefulTasks; while (remainingStatefulTasks > 0) { + final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" + subtopology; + changelogNames.add(changelogTopicName); final int partitions = Math.min(remainingStatefulTasks, partitionCountSupplier.get()); for (int i = 0; i < partitions; i++) { - statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 150000L); + final TaskId taskId = new TaskId(subtopology, i); + statefulTaskEndOffsetSums.put(taskId, 150000L); remainingStatefulTasks--; + + Node[] replica = getRandomReplica(nodes, nodeIndex); + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" + subtopology, i, replica[0], replica, replica)); + nodeIndex++; + + partitionsForTask.put(taskId, mkSet(new TopicPartition(TOPIC_PREFIX + "_" + subtopology, i))); + changelogPartitionsForTask.put(taskId, mkSet(new TopicPartition(changelogTopicName, i))); + tasksForTopicGroup.computeIfAbsent(new Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId); + + final Random random = new Random(); Review Comment: We should init random with a value that we log (to allow us to reproduce the same test) ``` final long seed = System.currentTimeMillis(): log.info(seed); // or just use println final Random random = new Random(seed); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org