vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458173651



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = 
topicPartitionsForStore(store);
-        Map<TopicPartition, Long> highWatermarks = null;
 
-        int attempts = 0;
-        while (highWatermarks == null) {
-            try {
-                highWatermarks = globalConsumer.endOffsets(topicPartitions);
-            } catch (final TimeoutException retryableException) {
-                if (++attempts > retries) {
-                    log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-                        "You can increase the number of retries via 
configuration parameter `retries`.",
-                        store.name(),
-                        retries,
-                        retryableException);
-                    throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-                            "You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-                        retryableException);
-                }
-                log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-                    topicPartitions,
-                    retryBackoffMs,
-                    attempts,
-                    retries,
-                    retryableException);
-                Utils.sleep(retryBackoffMs);
-            }
+        final Map<TopicPartition, Long> highWatermarks;
+        try {
+            highWatermarks = globalConsumer.endOffsets(topicPartitions);
+        } catch (final TimeoutException retryableException) {
+            log.debug(
+                "Failed to get end offsets for partitions {}. The broker may 
be transiently unavailable at the moment. Will retry.",
+                topicPartitions,
+                retryableException

Review comment:
       The varargs version of `debug` does _not_ take a cause at the end. This 
exception will not be logged. You have to use the version that only takes 
`(String, Exception)`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = 
topicPartitionsForStore(store);
-        Map<TopicPartition, Long> highWatermarks = null;
 
-        int attempts = 0;
-        while (highWatermarks == null) {
-            try {
-                highWatermarks = globalConsumer.endOffsets(topicPartitions);
-            } catch (final TimeoutException retryableException) {
-                if (++attempts > retries) {
-                    log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-                        "You can increase the number of retries via 
configuration parameter `retries`.",
-                        store.name(),
-                        retries,
-                        retryableException);
-                    throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-                            "You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-                        retryableException);
-                }
-                log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-                    topicPartitions,
-                    retryBackoffMs,
-                    attempts,
-                    retries,
-                    retryableException);
-                Utils.sleep(retryBackoffMs);
-            }
+        final Map<TopicPartition, Long> highWatermarks;
+        try {
+            highWatermarks = globalConsumer.endOffsets(topicPartitions);
+        } catch (final TimeoutException retryableException) {
+            log.debug(
+                "Failed to get end offsets for partitions {}. The broker may 
be transiently unavailable at the moment. Will retry.",
+                topicPartitions,
+                retryableException
+            );
+
+            // handled in `GlobalStateMangerImpl#initialize()`
+            throw retryableException;

Review comment:
       Minor note: it's confusing to track down exceptions when they are 
re-thrown like this, because the stacktrace would only reference L223. Even 
though there is a small performance penalty, it's better for maintainability to 
always throw a new exception like `new RetryableException(timeoutException)`.
   
   In this particular case, it may be more appropriate just to remove the 
try-catch and add a `throws` declaration. There are two things that make me 
think this:
   1. The log message here says, "will retry", but this method can have no idea 
whether or not it'll be retried
   2. The calling method has a comment that says this method logs the error, 
which is also an assumption that may not survive refactoring
   
   It seems like we can resolve all three of these maintenence problems by just 
moving the log message to the caller.
   
   This feedback also applies elsewhere.
   

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##########
@@ -613,69 +617,262 @@ public boolean lockGlobalState() throws IOException {
     }
 
     @Test
-    public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
-        final int retries = 2;
+    public void 
shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
         consumer = new MockConsumer<byte[], 
byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
-            public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<org.apache.kafka.common.TopicPartition> partitions) {
+            public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
                 numberOfCalls.incrementAndGet();
-                throw new TimeoutException();
+                throw new TimeoutException("KABOOM!");
             }
         };
+        consumer.updatePartitions(t1.topic(), Collections.singletonList(new 
PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
+
         streamsConfig = new StreamsConfig(new Properties() {
             {
                 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-                put(StreamsConfig.RETRIES_CONFIG, retries);
+                put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L);
             }
         });
 
-        try {
-            new GlobalStateManagerImpl(
-                new LogContext("mock"),
-                topology,
-                consumer,
-                stateDirectory,
-                stateRestoreListener,
-                streamsConfig);
-        } catch (final StreamsException expected) {
-            assertEquals(numberOfCalls.get(), retries);
-        }
+        stateManager = new GlobalStateManagerImpl(
+            new LogContext("mock"),
+            time,
+            topology,
+            consumer,
+            stateDirectory,
+            stateRestoreListener,
+            streamsConfig
+        );
+        processorContext.setStateManger(stateManager);
+        stateManager.setGlobalProcessorContext(processorContext);
+
+        final StreamsException expected = assertThrows(
+            StreamsException.class,
+            () -> stateManager.initialize()
+        );
+        final Throwable cause = expected.getCause();
+        assertThat(cause, instanceOf(TimeoutException.class));
+        assertThat(cause.getMessage(), equalTo("KABOOM!"));
+
+        assertEquals(numberOfCalls.get(), 1);
     }
 
     @Test
-    public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
-        final int retries = 2;
+    public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
         final AtomicInteger numberOfCalls = new AtomicInteger(0);
         consumer = new MockConsumer<byte[], 
byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
-            public synchronized List<PartitionInfo> partitionsFor(final String 
topic) {
+            public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
+                time.sleep(100L);
                 numberOfCalls.incrementAndGet();
-                throw new TimeoutException();
+                throw new TimeoutException("KABOOM!");
             }
         };
+        consumer.updatePartitions(t1.topic(), Collections.singletonList(new 
PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
+        consumer.updatePartitions(t2.topic(), Collections.singletonList(new 
PartitionInfo(t2.topic(), t2.partition(), null, null, null)));
+
         streamsConfig = new StreamsConfig(new Properties() {
             {
                 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
                 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
                 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-                put(StreamsConfig.RETRIES_CONFIG, retries);
+                put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
             }
         });
 
-        try {
-            new GlobalStateManagerImpl(
-                new LogContext("mock"),
-                topology,
-                consumer,
-                stateDirectory,
-                stateRestoreListener,
-                streamsConfig);
-        } catch (final StreamsException expected) {
-            assertEquals(numberOfCalls.get(), retries);
-        }
+        stateManager = new GlobalStateManagerImpl(
+            new LogContext("mock"),
+            time,
+            topology,
+            consumer,
+            stateDirectory,
+            stateRestoreListener,
+            streamsConfig
+        );
+        processorContext.setStateManger(stateManager);
+        stateManager.setGlobalProcessorContext(processorContext);
+
+        final TimeoutException expected = assertThrows(
+            TimeoutException.class,
+            () -> stateManager.initialize()
+        );
+        assertThat(expected.getMessage(), equalTo("Global task did not make 
progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
+
+        assertEquals(numberOfCalls.get(), 2);
+    }
+
+    @Test
+    public void 
shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
+        final AtomicInteger numberOfCalls = new AtomicInteger(0);
+        consumer = new MockConsumer<byte[], 
byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public synchronized Map<TopicPartition, Long> endOffsets(final 
Collection<TopicPartition> partitions) {
+                time.sleep(100L);
+                numberOfCalls.incrementAndGet();
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        consumer.updatePartitions(t1.topic(), Collections.singletonList(new 
PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
+        consumer.updatePartitions(t2.topic(), Collections.singletonList(new 
PartitionInfo(t2.topic(), t2.partition(), null, null, null)));
+        consumer.updatePartitions(t3.topic(), Collections.singletonList(new 
PartitionInfo(t3.topic(), t3.partition(), null, null, null)));
+        consumer.updatePartitions(t4.topic(), Collections.singletonList(new 
PartitionInfo(t4.topic(), t4.partition(), null, null, null)));
+
+        streamsConfig = new StreamsConfig(new Properties() {
+            {

Review comment:
       Double-brace initialization is an anti-pattern. It would be preferable 
to use `mkProperties`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to