vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458173651
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List<TopicPartition> topicPartitions = topicPartitionsForStore(store); - Map<TopicPartition, Long> highWatermarks = null; - int attempts = 0; - while (highWatermarks == null) { - try { - highWatermarks = globalConsumer.endOffsets(topicPartitions); - } catch (final TimeoutException retryableException) { - if (++attempts > retries) { - log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", - store.name(), - retries, - retryableException); - throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), - retryableException); - } - log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", - topicPartitions, - retryBackoffMs, - attempts, - retries, - retryableException); - Utils.sleep(retryBackoffMs); - } + final Map<TopicPartition, Long> highWatermarks; + try { + highWatermarks = globalConsumer.endOffsets(topicPartitions); + } catch (final TimeoutException retryableException) { + log.debug( + "Failed to get end offsets for partitions {}. The broker may be transiently unavailable at the moment. Will retry.", + topicPartitions, + retryableException Review comment: The varargs version of `debug` does _not_ take a cause at the end. This exception will not be logged. You have to use the version that only takes `(String, Exception)`. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List<TopicPartition> topicPartitions = topicPartitionsForStore(store); - Map<TopicPartition, Long> highWatermarks = null; - int attempts = 0; - while (highWatermarks == null) { - try { - highWatermarks = globalConsumer.endOffsets(topicPartitions); - } catch (final TimeoutException retryableException) { - if (++attempts > retries) { - log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", - store.name(), - retries, - retryableException); - throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), - retryableException); - } - log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", - topicPartitions, - retryBackoffMs, - attempts, - retries, - retryableException); - Utils.sleep(retryBackoffMs); - } + final Map<TopicPartition, Long> highWatermarks; + try { + highWatermarks = globalConsumer.endOffsets(topicPartitions); + } catch (final TimeoutException retryableException) { + log.debug( + "Failed to get end offsets for partitions {}. The broker may be transiently unavailable at the moment. Will retry.", + topicPartitions, + retryableException + ); + + // handled in `GlobalStateMangerImpl#initialize()` + throw retryableException; Review comment: Minor note: it's confusing to track down exceptions when they are re-thrown like this, because the stacktrace would only reference L223. Even though there is a small performance penalty, it's better for maintainability to always throw a new exception like `new RetryableException(timeoutException)`. In this particular case, it may be more appropriate just to remove the try-catch and add a `throws` declaration. There are two things that make me think this: 1. The log message here says, "will retry", but this method can have no idea whether or not it'll be retried 2. The calling method has a comment that says this method logs the error, which is also an assumption that may not survive refactoring It seems like we can resolve all three of these maintenence problems by just moving the log message to the caller. This feedback also applies elsewhere. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ########## @@ -613,69 +617,262 @@ public boolean lockGlobalState() throws IOException { } @Test - public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { - final int retries = 2; + public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { @Override - public synchronized Map<TopicPartition, Long> endOffsets(final Collection<org.apache.kafka.common.TopicPartition> partitions) { + public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { numberOfCalls.incrementAndGet(); - throw new TimeoutException(); + throw new TimeoutException("KABOOM!"); } }; + consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null))); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - put(StreamsConfig.RETRIES_CONFIG, retries); + put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L); } }); - try { - new GlobalStateManagerImpl( - new LogContext("mock"), - topology, - consumer, - stateDirectory, - stateRestoreListener, - streamsConfig); - } catch (final StreamsException expected) { - assertEquals(numberOfCalls.get(), retries); - } + stateManager = new GlobalStateManagerImpl( + new LogContext("mock"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + final StreamsException expected = assertThrows( + StreamsException.class, + () -> stateManager.initialize() + ); + final Throwable cause = expected.getCause(); + assertThat(cause, instanceOf(TimeoutException.class)); + assertThat(cause.getMessage(), equalTo("KABOOM!")); + + assertEquals(numberOfCalls.get(), 1); } @Test - public void shouldRetryWhenPartitionsForThrowsTimeoutException() { - final int retries = 2; + public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() { final AtomicInteger numberOfCalls = new AtomicInteger(0); consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { @Override - public synchronized List<PartitionInfo> partitionsFor(final String topic) { + public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { + time.sleep(100L); numberOfCalls.incrementAndGet(); - throw new TimeoutException(); + throw new TimeoutException("KABOOM!"); } }; + consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null))); + consumer.updatePartitions(t2.topic(), Collections.singletonList(new PartitionInfo(t2.topic(), t2.partition(), null, null, null))); + streamsConfig = new StreamsConfig(new Properties() { { put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - put(StreamsConfig.RETRIES_CONFIG, retries); + put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L); } }); - try { - new GlobalStateManagerImpl( - new LogContext("mock"), - topology, - consumer, - stateDirectory, - stateRestoreListener, - streamsConfig); - } catch (final StreamsException expected) { - assertEquals(numberOfCalls.get(), retries); - } + stateManager = new GlobalStateManagerImpl( + new LogContext("mock"), + time, + topology, + consumer, + stateDirectory, + stateRestoreListener, + streamsConfig + ); + processorContext.setStateManger(stateManager); + stateManager.setGlobalProcessorContext(processorContext); + + final TimeoutException expected = assertThrows( + TimeoutException.class, + () -> stateManager.initialize() + ); + assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed.")); + + assertEquals(numberOfCalls.get(), 2); + } + + @Test + public void shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() { + final AtomicInteger numberOfCalls = new AtomicInteger(0); + consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { + @Override + public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) { + time.sleep(100L); + numberOfCalls.incrementAndGet(); + throw new TimeoutException("KABOOM!"); + } + }; + consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null))); + consumer.updatePartitions(t2.topic(), Collections.singletonList(new PartitionInfo(t2.topic(), t2.partition(), null, null, null))); + consumer.updatePartitions(t3.topic(), Collections.singletonList(new PartitionInfo(t3.topic(), t3.partition(), null, null, null))); + consumer.updatePartitions(t4.topic(), Collections.singletonList(new PartitionInfo(t4.topic(), t4.partition(), null, null, null))); + + streamsConfig = new StreamsConfig(new Properties() { + { Review comment: Double-brace initialization is an anti-pattern. It would be preferable to use `mkProperties`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org