vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460338414



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       Ah, that last point sounds good.
   
   I'm not opposed, but it still seems strange to grab a random timeout from 
the client config and apply it to the `poll` method. It kind of makes me wonder 
what we're really trying to do here. If we want to give poll at least 30 
seconds to return data, then we can just give it at least 30 seconds, right? No 
reason to abuse the client configuration.
   
   On the other hand, the pollTime may not even appropriate for the global 
thread, right? It seems more like it was meant for StreamThread to make sure we 
don't block too long and violate the max poll interval. But since the global 
thread is only assigned and not subscribed, it has no constraint on how long it 
should block, except for the taskTimeoutMs config, right?
   
   Clearly, we need to set some kind of lower bound on it, though, but it's not 
clear that it needs to be configurable. Anyway, just food for thought. Feel 
free to keep the requestTimeout if you really think it's appropriate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to