mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460320506



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately

Review comment:
       I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is 
still clean is we just use it here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                             
restoreRecords.add(recordConverter.convert(record));
                         }
                     }
-                    offset = globalConsumer.position(topicPartition);
+                    try {
+                        offset = globalConsumer.position(topicPartition);
+                    } catch (final TimeoutException error) {
+                        // the `globalConsumer.position()` call should never 
block, because we know that we did
+                        // a successful `position()` call above for the 
requested partition and thus the consumer
+                        // should have a valid local position that it can 
return immediately

Review comment:
       I guess with the new `retryUntilSuccessOrThrowOnTaskTimeout` the code is 
still clean is we just use it here, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to