mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460305033
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta log.info("Restoring state for global store {}", store.name()); final List<TopicPartition> topicPartitions = topicPartitionsForStore(store); - Map<TopicPartition, Long> highWatermarks = null; - int attempts = 0; - while (highWatermarks == null) { - try { - highWatermarks = globalConsumer.endOffsets(topicPartitions); - } catch (final TimeoutException retryableException) { - if (++attempts > retries) { - log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", - store.name(), - retries, - retryableException); - throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " + - "You can increase the number of retries via configuration parameter `retries`.", store.name(), retries), - retryableException); - } - log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", - topicPartitions, - retryBackoffMs, - attempts, - retries, - retryableException); - Utils.sleep(retryBackoffMs); - } - } + final Map<TopicPartition, Long> highWatermarks = new HashMap<>(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)), Review comment: Correct, we rely on the clients internal backoff behavior here. Happy to switch to a `Supplier` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org