mjsax commented on code in PR #14164:
URL: https://github.com/apache/kafka/pull/14164#discussion_r1287697676


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##########
@@ -45,75 +81,162 @@ private static final class Harness {
         private final Map<UUID, ClientState> droppedClientStates;
         private final StringBuilder history = new StringBuilder();
 
+        public final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+        public final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask;
+        public final Map<Subtopology, Set<TaskId>> tasksForTopicGroup;
+        public final Cluster fullMetadata;
+        public final Map<UUID, Map<String, Optional<String>>> 
racksForProcessConsumer;
+        public final InternalTopicManager internalTopicManager;
+
         private static Harness initializeCluster(final int numStatelessTasks,
                                                  final int numStatefulTasks,
-                                                 final int numNodes,
-                                                 final Supplier<Integer> 
partitionCountSupplier) {
+                                                 final int numClients,
+                                                 final Supplier<Integer> 
partitionCountSupplier,
+                                                 final int numNodes) {
             int subtopology = 0;
             final Set<TaskId> statelessTasks = new TreeSet<>();
             int remainingStatelessTasks = numStatelessTasks;
+            final List<Node> nodes = getRandomNodes(numNodes);
+            int nodeIndex = 0;
+            final Set<PartitionInfo> partitionInfoSet = new HashSet<>();
+            final Map<TaskId, Set<TopicPartition>> partitionsForTask = new 
HashMap<>();
+            final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask 
= new HashMap<>();
+            final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = new 
HashMap<>();
+
             while (remainingStatelessTasks > 0) {
                 final int partitions = Math.min(remainingStatelessTasks, 
partitionCountSupplier.get());
                 for (int i = 0; i < partitions; i++) {
-                    statelessTasks.add(new TaskId(subtopology, i));
+                    final TaskId taskId = new TaskId(subtopology, i);
+                    statelessTasks.add(taskId);
                     remainingStatelessTasks--;
+
+                    final Node[] replica = getRandomReplica(nodes, nodeIndex);
+                    partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+                    nodeIndex++;
+
+                    partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+                    tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
                 }
                 subtopology++;
             }
 
             final Map<TaskId, Long> statefulTaskEndOffsetSums = new 
TreeMap<>();
+            final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = 
new HashMap<>();
+            final Set<String> changelogNames = new HashSet<>();
             int remainingStatefulTasks = numStatefulTasks;
             while (remainingStatefulTasks > 0) {
+                final String changelogTopicName = CHANGELOG_TOPIC_PREFIX + "_" 
+ subtopology;
+                changelogNames.add(changelogTopicName);
                 final int partitions = Math.min(remainingStatefulTasks, 
partitionCountSupplier.get());
                 for (int i = 0; i < partitions; i++) {
-                    statefulTaskEndOffsetSums.put(new TaskId(subtopology, i), 
150000L);
+                    final TaskId taskId = new TaskId(subtopology, i);
+                    statefulTaskEndOffsetSums.put(taskId, 150000L);
                     remainingStatefulTasks--;
+
+                    Node[] replica = getRandomReplica(nodes, nodeIndex);
+                    partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + "_" 
+ subtopology, i, replica[0], replica, replica));
+                    nodeIndex++;
+
+                    partitionsForTask.put(taskId, mkSet(new 
TopicPartition(TOPIC_PREFIX + "_" + subtopology, i)));
+                    changelogPartitionsForTask.put(taskId, mkSet(new 
TopicPartition(changelogTopicName, i)));
+                    tasksForTopicGroup.computeIfAbsent(new 
Subtopology(subtopology, null), k -> new HashSet<>()).add(taskId);
+
+                    final Random random = new Random();
+                    final int changelogNodeIndex = 
random.nextInt(nodes.size());
+                    replica = getRandomReplica(nodes, changelogNodeIndex);
+                    final TopicPartitionInfo info = new TopicPartitionInfo(i, 
replica[0], Arrays.asList(replica[0], replica[1]), Collections.emptyList());
+                    topicPartitionInfo.computeIfAbsent(changelogTopicName, tp 
-> new ArrayList<>()).add(info);
                 }
                 subtopology++;
             }
 
+            final MockTime time = new MockTime();
+            final StreamsConfig streamsConfig = new 
StreamsConfig(configProps(true));
+            final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
+            final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(
+                time,
+                streamsConfig,
+                mockClientSupplier.restoreConsumer,
+                false
+            );
+            final InternalTopicManager spyTopicManager = 
spy(mockInternalTopicManager);
+            
doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames);
+
+            final Cluster cluster = new Cluster(
+                "cluster",
+                new HashSet<>(nodes),
+                partitionInfoSet,
+                Collections.emptySet(),
+                Collections.emptySet()
+            );
+
             final Map<UUID, ClientState> clientStates = new TreeMap<>();
-            for (int i = 0; i < numNodes; i++) {
+            final Map<UUID, Map<String, Optional<String>>> 
racksForProcessConsumer = new HashMap<>();
+            for (int i = 0; i < numClients; i++) {
                 final UUID uuid = uuidForInt(i);
                 clientStates.put(uuid, emptyInstance(uuid, 
statefulTaskEndOffsetSums));
+                final Random random = new Random();

Review Comment:
   as above: maybe setup `Random` inside `@BeforeTest` instead and share?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to