vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459827653



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
+
+        long deadlineMs = NO_DEADLINE;
+        final List<StateStore> storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+        while (!storesToInitialize.isEmpty()) {
+            // we remove and add back on failure to round-robin through all 
stores
+            final StateStore stateStore = storesToInitialize.remove(0);
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+
+            try {
+                stateStore.init(globalProcessorContext, stateStore);
+                deadlineMs = NO_DEADLINE;
+            } catch (final RetryableErrorException retryableException) {
+                if (taskTimeoutMs == 0L) {
+                    throw new StreamsException(retryableException.getCause());
+                }
+
+                storesToInitialize.add(stateStore);
+
+                final long currentWallClockMs = time.milliseconds();
+                if (deadlineMs == NO_DEADLINE) {
+                    final long newDeadlineMs = currentWallClockMs + 
taskTimeoutMs;
+                    deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : 
newDeadlineMs;
+                } else if (currentWallClockMs > deadlineMs) {
+                    throw new TimeoutException(String.format(
+                        "Global task did not make progress to restore state 
within %d ms. Adjust `task.timeout.ms` if needed.",
+                        currentWallClockMs - deadlineMs + taskTimeoutMs
+                    ));
+                }
+
+                log.debug(retryableException.getMessage() + " Will retry. 
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);

Review comment:
       Ah, good, at least it wasn't lack of clarity. It's your call. I've 
wasted quite a bit of time trying to track down root causes of things 
specifically because of stuff like this. It really doesn't seem like saving ten 
log lines is worth the extra hassle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to