mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460320303



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(
+                                deadlineMs,
+                                requestTimeoutMs,
+                                new StreamsException(String.format(
+                                    "Global task did not make progress to 
restore state. Retrying is disabled. You can enable it by setting `%s` to a 
value larger than zero.",
+                                    StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+                                ))
+                            );
+                        } else {
+                            deadlineMs = 
maybeUpdateDeadlineOrThrow(deadlineMs);
+                        }
+
+                        continue;
+                    }
+                    deadlineMs = NO_DEADLINE;
+
                     final List<ConsumerRecord<byte[], byte[]>> restoreRecords 
= new ArrayList<>();
                     for (final ConsumerRecord<byte[], byte[]> record : 
records.records(topicPartition)) {
                         if (record.key() != null) {
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately
+
+                        // hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+                        throw new IllegalStateException(error);

Review comment:
       As requested by John, I'll drop this and just call 
`retryUntilSuccessOrThrowOnTaskTimeout` here, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to