mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r458965799
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } - offset = globalConsumer.position(topicPartition); + try { + offset = globalConsumer.position(topicPartition); + } catch (final TimeoutException error) { + // the `globalConsumer.position()` call should never block, because we know that we did + // a successful `position()` call above for the requested partition and thus the consumer + // should have a valid local position that it can return immediately + + // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` + throw new IllegalStateException(error); + } + stateRestoreAdapter.restoreBatch(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); Review comment: @vvcephei I actually had a follow up thought: given that we fetch data for a single partition only, should we trigger a timeout within this loop if `records` is empty (ie, `poll() did not return anything)? Otherwise, this restore loop might "hang" forever if we lose the connection to the broker -- IMHO, `task.timeout.ms` should cover this case? Otherwise, we block on startup as the initial global store loading would never finish and we don't even start the actual `StreamThreads`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org