vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       Ok, thanks @mjsax . I just traced though the consumer code again, and 
have finally been able to see what you already knew: that `request.timeout.ms` 
is indeed the correct amount of time to wait.
   
   Namely, we send a fetch here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285
   
   Which calls through to client.send here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263
   
   Which fills in the `request.timeout.ms` config value here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106
   
   Which uses it to construct a ClientRequest here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130
   
   Which then gets used to create an InFlightRequest when it gets sent here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248
   
   Which is later used to detect expired requests here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162
   
   Which is used to list nodes (brokers) for which there is an expired request 
here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179
   
   Which is then processed as a "disconnection" here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803
   
   It also looks like the KafkaClient just does a tight-loop checking for a 
network response, so we don't really need any extra time to account for 
sampling errors. Also, it still seems like using the sum as the poll duration 
is just as good as using your retry logic, so I think the duration parameter is 
fine.
   
   My only remaining question, which maybe doesn't really matter one way or 
another, is whether `poll.ms` really belongs here or not. It seems like the 
desired semantics are accomplished by just waiting `request.timeout.ms` for the 
initial failure, and then an extra `task.timeout.ms` for any retries.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
             while (offset < highWatermark) {
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    final ConsumerRecords<byte[], byte[]> records =
+                        
globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout);

Review comment:
       Unfortunately, Github ate the very extensive thread about this: 
https://github.com/apache/kafka/pull/9047/files/0e07109bb0cfe87c85e76fbab3b50e9274300388#r460449901




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to