abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436283473



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,11 +103,11 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        int remainingRetries = retries;
+        remainingRetries = retries;

Review comment:
       This is a personal preference, but I think we should not attempt to 
include a temporal variable as part of the class struct. We could change the 
internal function signatures to pass around remainingRetries (like 
`validateTopics`) instead.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -195,20 +198,30 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = 
topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, 
topicDescription.partitions().size());
+                if (leaderNotAvailableTopics.contains(topicName)) {

Review comment:
       This contains check is unnecessary.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Similar to this struct, it doesn't make sense to have a non-empty 
leaderNotAvailableTopics after each call to `makeReady`, I would prefer 
building it as local variable, cc @ableegoldman 

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection<String> topic
                 future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic 
name is "LeaderNotAvailableTopic"

Review comment:
       This workaround is very hard to be found by other developers, as a 
minimum we should define a constant and make it part of `MockAdminClient` class

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       This should be declared final.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,49 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-                    " 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String topicLeaderNotAvailable = "LeaderNotAvailableTopic";
+        mockAdminClient.addTopic(
+            false,
+            topicLeaderNotAvailable,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, 
cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topicLeaderNotAvailable, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new 
HashMap<>();
+        topicConfigMap.put(topicLeaderNotAvailable, internalTopicConfig);
+
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+            final StreamsException exception = assertThrows(
+                StreamsException.class,
+                () -> internalTopicManager.makeReady(topicConfigMap));
+
+            final String expectedMessage = "Could not create topics after 1 
retries. This can happen if the Kafka cluster is temporary not available";

Review comment:
       Testing against log message is error-prone and hard to maintain, I think 
just making sure the thrown exception type is expected should be sufficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to