vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459827653



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
+
+        long deadlineMs = NO_DEADLINE;
+        final List<StateStore> storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+        while (!storesToInitialize.isEmpty()) {
+            // we remove and add back on failure to round-robin through all 
stores
+            final StateStore stateStore = storesToInitialize.remove(0);
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+
+            try {
+                stateStore.init(globalProcessorContext, stateStore);
+                deadlineMs = NO_DEADLINE;
+            } catch (final RetryableErrorException retryableException) {
+                if (taskTimeoutMs == 0L) {
+                    throw new StreamsException(retryableException.getCause());
+                }
+
+                storesToInitialize.add(stateStore);
+
+                final long currentWallClockMs = time.milliseconds();
+                if (deadlineMs == NO_DEADLINE) {
+                    final long newDeadlineMs = currentWallClockMs + 
taskTimeoutMs;
+                    deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : 
newDeadlineMs;
+                } else if (currentWallClockMs > deadlineMs) {
+                    throw new TimeoutException(String.format(
+                        "Global task did not make progress to restore state 
within %d ms. Adjust `task.timeout.ms` if needed.",
+                        currentWallClockMs - deadlineMs + taskTimeoutMs
+                    ));
+                }
+
+                log.debug(retryableException.getMessage() + " Will retry. 
Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs);

Review comment:
       Ah, good, at least it wasn't lack of clarity. It's your call. I've 
wasted quite a bit of time trying to track down root causes of things 
specifically because of stuff like this. It really doesn't seem like saving ten 
log lines is worth the extra hassle.
   
   I'm not saying it's "bad" the way you proposed. But it is unusual, and 
unusual code is more often than not a bummer in the long run. Exceptions in 
java take a "cause" for a reason, and they print the stacktraces of those 
causes in a well understood order and format that integrates well with all 
kinds of tools from IDEs to analytics and log aggregation. I agree that the 
stacktrace isn't that useful in this case. Still, specifically breaking this 
whole ecosystem because we don't think the stacktrace is likely useful seems a 
poor tradeoff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to