showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448103331
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ########## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), - hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + - " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") + hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + + "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } + @Test + public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { + final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); + final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + + final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); + topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); + final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); + topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); + final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( + Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) + .once(); + // return empty set for 1st time + EasyMock.expect(admin.createTopics(Collections.emptySet())) + .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( + Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) + .once(); + EasyMock.expect(admin.createTopics(Collections.singleton( + new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), + Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), + Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), + Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")))))) + .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + + EasyMock.replay(admin); + + final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); + internalTopicConfig.setNumberOfPartitions(1); + final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>(); + topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); + topicManager.makeReady(topicConfigMap); + + EasyMock.verify(admin); + } + + @Test + public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { + final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); + final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1, + Collections.singletonList(broker1), Collections.singletonList(broker1)); + + final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>(); + topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); + final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>(); + topicDescriptionSuccessFuture.complete( + new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet()) + ); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) + .andReturn(new MockDescribeTopicsResult( + Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture))) + .once(); + EasyMock.expect(admin.createTopics(Collections.emptySet())) + .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) Review comment: good. Thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org