showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448099358
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ########## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), - hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + - " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") + hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + + "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } + @Test + public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { + final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); + final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + + final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); + topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); + final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); + topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); + final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( + Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) + .once(); + // return empty set for 1st time + EasyMock.expect(admin.createTopics(Collections.emptySet())) + .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( + Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) + .once(); + EasyMock.expect(admin.createTopics(Collections.singleton( + new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), + Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), + Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), + Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")))))) + .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + + EasyMock.replay(admin); + + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); + internalTopicConfig.setNumberOfPartitions(1); + final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>(); + topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); + topicManager.makeReady(topicConfigMap); + + EasyMock.verify(admin); + } + + @Test + public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { Review comment: Yes, they are different scenarios. You can check below diagram for reference. (red and green for different cases)  ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org