mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460305033
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final
StateRestoreCallback sta
log.info("Restoring state for global store {}", store.name());
final List<TopicPartition> topicPartitions =
topicPartitionsForStore(store);
- Map<TopicPartition, Long> highWatermarks = null;
- int attempts = 0;
- while (highWatermarks == null) {
- try {
- highWatermarks = globalConsumer.endOffsets(topicPartitions);
- } catch (final TimeoutException retryableException) {
- if (++attempts > retries) {
- log.error("Failed to get end offsets for topic partitions
of global store {} after {} retry attempts. " +
- "You can increase the number of retries via
configuration parameter `retries`.",
- store.name(),
- retries,
- retryableException);
- throw new StreamsException(String.format("Failed to get
end offsets for topic partitions of global store %s after %d retry attempts. " +
- "You can increase the number of retries via
configuration parameter `retries`.", store.name(), retries),
- retryableException);
- }
- log.debug("Failed to get end offsets for partitions {},
backing off for {} ms to retry (attempt {} of {})",
- topicPartitions,
- retryBackoffMs,
- attempts,
- retries,
- retryableException);
- Utils.sleep(retryBackoffMs);
- }
- }
+ final Map<TopicPartition, Long> highWatermarks = new HashMap<>();
+ retryUntilSuccessOrThrowOnTaskTimeout(
+ () ->
highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)),
Review comment:
Correct, we rely on the clients internal backoff behavior here. Happy to
switch to a `Supplier`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]