vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459827653
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { + + long deadlineMs = NO_DEADLINE; + final List<StateStore> storesToInitialize = new LinkedList<>(globalStateStores); + + while (!storesToInitialize.isEmpty()) { + // we remove and add back on failure to round-robin through all stores + final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); - stateStore.init(globalProcessorContext, stateStore); + + try { + stateStore.init(globalProcessorContext, stateStore); + deadlineMs = NO_DEADLINE; + } catch (final RetryableErrorException retryableException) { + if (taskTimeoutMs == 0L) { + throw new StreamsException(retryableException.getCause()); + } + + storesToInitialize.add(stateStore); + + final long currentWallClockMs = time.milliseconds(); + if (deadlineMs == NO_DEADLINE) { + final long newDeadlineMs = currentWallClockMs + taskTimeoutMs; + deadlineMs = newDeadlineMs < 0L ? Long.MAX_VALUE : newDeadlineMs; + } else if (currentWallClockMs > deadlineMs) { + throw new TimeoutException(String.format( + "Global task did not make progress to restore state within %d ms. Adjust `task.timeout.ms` if needed.", + currentWallClockMs - deadlineMs + taskTimeoutMs + )); + } + + log.debug(retryableException.getMessage() + " Will retry. Remaining time in milliseconds: {}", deadlineMs - currentWallClockMs); Review comment: Ah, good, at least it wasn't lack of clarity. It's your call. I've wasted quite a bit of time trying to track down root causes of things specifically because of stuff like this. It really doesn't seem like saving ten log lines is worth the extra hassle. I'm not saying it's "bad" the way you proposed. But it is unusual, and unusual code is more often than not a bummer in the long run. Exceptions in java take a "cause" for a reason, and they print the stacktraces of those causes in a well understood order and format that integrates well with all kinds of tools from IDEs to analytics and log aggregation. I agree that the stacktrace isn't that useful in this case. Still, specifically breaking this whole ecosystem because we don't think the stacktrace is likely useful seems a poor tradeoff. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org