vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + final AtomicLong position = new AtomicLong(); + retryUntilSuccessOrThrowOnTaskTimeout( + () -> position.set(globalConsumer.position(topicPartition)), + String.format( + "Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", + topicPartition + ) + ); + offset = position.get(); } - long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; + long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime); + if (records.isEmpty()) { + if (taskTimeoutMs == 0L) { + deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks, @mjsax , that's fair. I promise I'm not equivocating here; I'm just trying to figure out what my intuition is trying to tell me. It seems like maybe the fundamental problem here is that we can't distinguish among a successful poll that returns no data, a failure to poll, and a pending async fetch request. The one thing we know is that the end offset is beyond our current position, so there _should_ be data to poll, so we can assume that an empty return means either that the fetch failed internally or it hasn't completed yet. Stepping back, this seems to be related to the problem of task idling, in which it's pointless to "idle" for a time so short that we have no chance to actually get a response back from the broker. I feel like this is substantially my fault from #4855 / KIP-266. The purpose of making this API completely async was to avoid harming liveness in situations where we might have a relatively strict deadline. But that's not the case here. I guess the "poor man's" solution we're going for here is to block poll for at least long enough to allow for a complete fetch round-trip from the broker. If we know that there was a round-trip, and we didn't get any data, then we can conclude that there was an error (since we know there is data to get). Since we can't know that there was a round trip, we weaken the condition to: if we know it's been long enough that there should have been a round-trip and we don't get data, we conclude there was probably an error. In your KIP, we specified we would start the task timer _after_ the first error, so it seems like we really want to just block the poll for the round-trip time, and then apply your "update deadline, etc." function. I'm with you now that to get the round-trip time, we have to extract some config(s) from the Consumer. This is a pretty awkward hack, but now that I've thought it through, it seems the best we can do. Maybe we can mull it over and file an improvement jira for the Consumer to improve use cases like this. Anyway, it seems like the "poll time" config is irrelevant, we just need to know what config to grab that corresponds to completing a fetch request with high probability. It seems like we shouldn't need to update metadata, so we would send a fetch request on the first poll call, and we just need to block for whatever time bounds the fetch response time. I'm honestly not sure what timeout would be best here. It looks like the ConsumerNetworkClient will just wait for a response until it gets a "disconnect" (L598). Is that a socket timeout? I'm not sure. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org