mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459790103



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
+
+        long deadlineMs = NO_DEADLINE;
+        final List<StateStore> storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+        while (!storesToInitialize.isEmpty()) {
+            // we remove and add back on failure to round-robin through all 
stores
+            final StateStore stateStore = storesToInitialize.remove(0);
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+
+            try {
+                stateStore.init(globalProcessorContext, stateStore);
+                deadlineMs = NO_DEADLINE;
+            } catch (final RetryableErrorException retryableException) {
+                if (taskTimeoutMs == 0L) {
+                    throw new StreamsException(retryableException.getCause());
+                }
+
+                storesToInitialize.add(stateStore);
+
+                final long currentWallClockMs = time.milliseconds();
+                if (deadlineMs == NO_DEADLINE) {
+                    final long newDeadlineMs = currentWallClockMs + 
taskTimeoutMs;
+                    deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : 
newDeadlineMs;
+                } else if (currentWallClockMs > deadlineMs) {
+                    throw new TimeoutException(String.format(
+                        "Global task did not make progress to restore state 
within %d ms. Adjust `task.timeout.ms` if needed.",
+                        currentWallClockMs - deadlineMs + taskTimeoutMs
+                    ));
+                }
+
+                log.debug(retryableException.getMessage() + " Will retry. 
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);

Review comment:
       I understood what you meant, but I thought that because we actual do 
retry, it's better/cleaner/less-noisy to not log the full stack trace?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to