lihaosky commented on code in PR #14164: URL: https://github.com/apache/kafka/pull/14164#discussion_r1287753972
########## streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java: ########## @@ -91,6 +91,7 @@ private Properties effectiveConfigFrom(final Properties initialConfig) { effectiveConfig.put(KafkaConfig.MessageMaxBytesProp(), 1000000); effectiveConfig.put(KafkaConfig.ControlledShutdownEnableProp(), true); effectiveConfig.put(KafkaConfig.ZkSessionTimeoutMsProp(), 10000); + effectiveConfig.put(KafkaConfig.RackProp(), "rack0"); Review Comment: Good catch. Might be some left over change. Let me delete it ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java: ########## @@ -272,27 +300,50 @@ private void createMockTaskManager(final Set<TaskId> activeTasks, // If mockCreateInternalTopics is true the internal topic manager will report that it had to create all internal // topics and we will skip the listOffsets request for these changelogs private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics) { - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + return overwriteInternalTopicManagerWithMock(mockCreateInternalTopics, null); + } + + private MockInternalTopicManager overwriteInternalTopicManagerWithMock(final boolean mockCreateInternalTopics, final List<Map<String, List<TopicPartitionInfo>>> topicPartitionInfo) { + final MockInternalTopicManager mockInternalTopicManager = spy(new MockInternalTopicManager( time, new StreamsConfig(configProps()), mockClientSupplier.restoreConsumer, mockCreateInternalTopics - ); + )); + + if (topicPartitionInfo != null) { + lenient().when(mockInternalTopicManager.getTopicPartitionInfo(anySet())).thenAnswer( + i -> { + final Set<String> topics = i.getArgument(0); + for (final Map<String, List<TopicPartitionInfo>> tp : topicPartitionInfo) { + if (topics.equals(tp.keySet())) { + return tp; + } + } + return null; Review Comment: We can return empty Map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org