mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461302625
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks for digging into it @vvcephei -- The question about `pollTimeout` is fair. I guess for default configs it might not matter much, as the default is 100ms IIRC. At the same time, we apply the same `pollTimeout` during regular processing, and as a matter of fact, for this case, a use might want to use a longer poll-timeout, as otherwise, the thread would just "busy wait" anyway (only the responsiveness for a shutdown of the app should be considered). Thus, it might actually make sense to exclude `pollTimeout` completely and only use `requestTimeout + taskTimeout`. Again, using `taskTimeout` in `poll()` reduces the responsiveness of a shutdown -- however, atm during bootstrapping we ignore a shutdown signal anyway, hence, for now we don't make the situation worse. I create a ticket to fix this: https://issues.apache.org/jira/browse/KAFKA-10317 and will add a comment for now. Short related note: actually using `requestTimeout` seems to be a conservative upper bound for poll(). A request could fail with a different error before `requestTimeout` hits and would be retried internally for this case -- if this happens, we might want to start the `taskTimeout` earlier. However, we don't have any means atm to detect this case. Thus, using `requestTimeout` is the best option we have right now (because triggering `taskTimeout` too early seems to be worse than triggering it too late). I created a ticket though that might allow us to improve the code later: https://issues.apache.org/jira/browse/KAFKA-10315 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org