mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460305033



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -185,32 +200,16 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = 
topicPartitionsForStore(store);
-        Map<TopicPartition, Long> highWatermarks = null;
 
-        int attempts = 0;
-        while (highWatermarks == null) {
-            try {
-                highWatermarks = globalConsumer.endOffsets(topicPartitions);
-            } catch (final TimeoutException retryableException) {
-                if (++attempts > retries) {
-                    log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-                        "You can increase the number of retries via 
configuration parameter `retries`.",
-                        store.name(),
-                        retries,
-                        retryableException);
-                    throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-                            "You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-                        retryableException);
-                }
-                log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-                    topicPartitions,
-                    retryBackoffMs,
-                    attempts,
-                    retries,
-                    retryableException);
-                Utils.sleep(retryBackoffMs);
-            }
-        }
+        final Map<TopicPartition, Long> highWatermarks = new HashMap<>();
+        retryUntilSuccessOrThrowOnTaskTimeout(
+            () -> 
highWatermarks.putAll(globalConsumer.endOffsets(topicPartitions)),

Review comment:
       Correct, we rely on the clients internal backoff behavior here. Happy to 
switch to a `Supplier`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to