mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460307083
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: > Can you explain how the outcome is different than just calling globalConsumer.poll(requestTimeoutMs)? I did consider it, but was not 100% sure if this might be better. Was also concerned about "miss using" request timeout (as you already mentioned) as we have `StreamsConfig.POLL_TIMEOUT_MS`. I like the proposal to use `pollTime + taskTimeoutMs` though! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org