mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287695148


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##########
@@ -45,75 +81,162 @@ private static final class Harness {
         private final Map<UUID, ClientState> droppedClientStates;
         private final StringBuilder history = new StringBuilder();
 
+        public final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+        public final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask;
+        public final Map<Subtopology, Set<TaskId>> tasksForTopicGroup;
+        public final Cluster fullMetadata;
+        public final Map<UUID, Map<String, Optional<String>>> 
racksForProcessConsumer;
+        public final InternalTopicManager internalTopicManager;
+
         private static Harness initializeCluster(final int numStatelessTasks,
                                                  final int numStatefulTasks,
-                                                 final int numNodes,
-                                                 final Supplier<Integer> 
partitionCountSupplier) {
+                                                 final int numClients,
+                                                 final Supplier<Integer> 
partitionCountSupplier,
+                                                 final int numNodes) {
             int subtopology = 0;
             final Set<TaskId> statelessTasks = new TreeSet<>();
             int remainingStatelessTasks = numStatelessTasks;
+            final List<Node> nodes = getRandomNodes(numNodes);
+            int nodeIndex = 0;
+            final Set<PartitionInfo> partitionInfoSet = new HashSet<>();
+            final Map<TaskId, Set<TopicPartition>> partitionsForTask = new 
HashMap<>();
+            final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask 
= new HashMap<>();
+            final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new 
HashMap<>();
+
             while (remainingStatelessTasks > 0) {
                 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
                 for (int i = 0; i < partitions; i++) {
-                    statelessTasks.add(new TaskId(subtopology, i));
+                    final TaskId taskId = new TaskId(subtopology, i);
+                    statelessTasks.add(taskId);
                     remainingStatelessTasks--;
+
+                    final Node[] replica = getRandomReplica(nodes, nodeIndex);
+                    partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+                    nodeIndex++;
+
+                    partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+                    tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
                 }
                 subtopology++;
             }
 
             final Map<TaskId, Long> statefulTaskEndOffsetSums = new 
TreeMap<>();
+            final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = 
new HashMap<>();
+            final Set<String> changelogNames = new HashSet<>();
             int remainingStatefulTasks = numStatefulTasks;
             while (remainingStatefulTasks > 0) {
+                final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+                changelogNames.add(changelogTopicName);
                 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
                 for (int i = 0; i < partitions; i++) {
-                    statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
150000L);
+                    final TaskId taskId = new TaskId(subtopology, i);
+                    statefulTaskEndOffsetSums.put(taskId, 150000L);
                     remainingStatefulTasks--;
+
+                    Node[] replica = getRandomReplica(nodes, nodeIndex);
+                    partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+                    nodeIndex++;
+
+                    partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+                    changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+                    tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+                    final Random random = new Random();

Review Comment:
   We should init random with a value that we log (to allow us to reproduce the 
same test)
   ```
   final long seed = System.currentTimeMillis():
   log.info(seed); // or just use println
   final Random random = new Random(seed);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to