mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460341186



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       Well, the requestTimeout is no really "random" -- if we send a fetch 
request, it's the maximum time until the producer would abort the request and 
retry. -- _If_ poll() would throw a TimeoutException, this would be the point 
when we get a timeout if there are no retries. -- To be fair, maybe using the 
consumer's _default_api_timeout_ms_ config might be better than using 
requestTimeout though? -- The point being is, that picking a consumer config 
seems to be the best thing we can do for this case instead of a hard-coded 
number (or introducing a new config for this specific corner case).
   
   > pollTime may not even appropriate for the global thread
   
   I think it does (even if differently). At least for regular processing: (1) 
we interleave polling and flushing, and also when we stop the client, we want 
it to responsive and not being blocked in poll() for 5 minutes (taskTimeoutMs 
default) for this case. (What of course implies that one cannot really 
"intercept" the startup phase atm -- something we might want to address, but 
maybe not in this PR.)
   
   > Clearly, we need to set some kind of lower bound on it, though, but it's 
not clear that it needs to be configurable.
   
   I think the worst we can do is to make it not configurable. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to