mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: I agree. The "issue" is really that `poll()` does _not_ throw a `TimeoutException`... Also, because we do manual assignment, `poll()` would never return "early" as it never need to wait for joining a consumer group. -- However, compare to `max.task.idle.ms`, we are in a better situation here, because we poll() for only a single partition at a time. I also agree, that applying `task.timeout.ms` should start _after_ we got a first timeout -- this was how the original code worked that you criticized as: > Man, this is confusing. And I agree, that the code was not straightforward to understand. But if we think it's the right thing to do, I am also happy to add it back :) I am also not an expert on all consumer internals, but from my understanding, fetch requests are send async in general, and if a fetch request fails, the consumer would actually not retry it but a retry would be triggered by the next `poll()` call. If there is no data available (ie, fetch request did not return yet) when `poll()` is called, the consumer would block internally until `poll(Duration)` timeout expires or until a fetch request returns (whatever comes first). Furthermore, before `poll()` returns, it always check if a fetch request is in-flight or not, and sends one if not. Thus, on the verify first call to `poll()` we know that no fetch request can be in-flight and we also know that `poll()` would send one, and block until it returns or `poll(Duration)` expired. Thus, if `poll()` does not block for at least `request.timeout.ms`, and we get empty back we don't know which case holds, however, if we use the request timeout, it seems that we _know_ if the fetch was successful or did time out? We also know, that a fetch request will be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying request timeout also ensures that we know if the request was successful or not. I guess the only difference to what I just described to my original code was, that I uses `pollTime + requestTimeout`. Bottom line: I am not 100% sure what you propose? Should we go with the original design? Or with the new design? -- In the end, I think we don't need a follow up PR, and we can just try to get it right in this PR. I don't see any benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch for a single partitions and thus it's a different case compared to `max.task.idle.ms` scenario). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org