vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Ok, thanks @mjsax . I just traced though the consumer code again, and have finally been able to see what you already knew: that `request.timeout.ms` is indeed the correct amount of time to wait. Namely, we send a fetch here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285 Which calls through to client.send here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263 Which fills in the `request.timeout.ms` config value here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106 Which uses it to construct a ClientRequest here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130 Which then gets used to create an InFlightRequest when it gets sent here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248 Which is later used to detect expired requests here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162 Which is used to list nodes (brokers) for which there is an expired request here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179 Which is then processed as a "disconnection" here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803 It also looks like the KafkaClient just does a tight-loop checking for a network response, so we don't really need any extra time to account for sampling errors. Also, it still seems like using the sum as the poll duration is just as good as using your retry logic, so I think the duration parameter is fine. My only remaining question, which maybe doesn't really matter one way or another, is whether `poll.ms` really belongs here or not. It seems like the desired semantics are accomplished by just waiting `request.timeout.ms` for the initial failure, and then an extra `task.timeout.ms` for any retries. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, while (offset < highWatermark) { try { - final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + final ConsumerRecords<byte[], byte[]> records = + globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout); Review comment: Unfortunately, Github ate the very extensive thread about this: https://github.com/apache/kafka/pull/9047/files/0e07109bb0cfe87c85e76fbab3b50e9274300388#r460449901 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org