mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460341186
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Well, the requestTimeout is no really "random" -- if we send a fetch request, it's the maximum time until the producer would abort the request and retry. -- _If_ poll() would throw a TimeoutException, this would be the point when we get a timeout if there are no retries. -- To be fair, maybe using the consumer's _default_api_timeout_ms_ config might be better than using requestTimeout though? -- The point being is, that picking a consumer config seems to be the best thing we can do for this case instead of a hard-coded number (or introducing a new config for this specific corner case). > pollTime may not even appropriate for the global thread I think it does (even if differently). At least for regular processing: (1) we interleave polling and flushing, and also when we stop the client, we want it to responsive and not being blocked in poll() for 5 minutes (taskTimeoutMs default) for this case. (What of course implies that one cannot really "intercept" the startup phase atm -- something we might want to address, but maybe not in this PR.) > Clearly, we need to set some kind of lower bound on it, though, but it's not clear that it needs to be configurable. I think the worst we can do is to make it not configurable. :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org