vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                               final RecordConverter recordConverter) {
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
             final Long checkpoint = checkpointFileCache.get(topicPartition);
             if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
             } else {
                 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                final AtomicLong position = new AtomicLong();
+                retryUntilSuccessOrThrowOnTaskTimeout(
+                    () -> 
position.set(globalConsumer.position(topicPartition)),
+                    String.format(
+                        "Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+                        topicPartition
+                    )
+                );
+                offset = position.get();
             }
 
-            long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
             final RecordBatchingStateRestoreCallback stateRestoreAdapter =
                 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
+            long deadlineMs = NO_DEADLINE;
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
+                    if (records.isEmpty()) {
+                        if (taskTimeoutMs == 0L) {
+                            deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
       Thanks, @mjsax , that's fair.
   
   I promise I'm not equivocating here; I'm just trying to figure out what my 
intuition is trying to tell me.
   
   It seems like maybe the fundamental problem here is that we can't 
distinguish among a successful poll that returns no data, a failure to poll, 
and a pending async fetch request. The one thing we know is that the end offset 
is beyond our current position, so there _should_ be data to poll, so we can 
assume that an empty return means either that the fetch failed internally or it 
hasn't completed yet.
   
   Stepping back, this seems to be related to the problem of task idling, in 
which it's pointless to "idle" for a time so short that we have no chance to 
actually get a response back from the broker.
   
   I feel like this is substantially my fault from #4855 / KIP-266. The purpose 
of making this API completely async was to avoid harming liveness in situations 
where we might have a relatively strict deadline. But that's not the case here.
   
   I guess the "poor man's" solution we're going for here is to block poll for 
at least long enough to allow for a complete fetch round-trip from the broker. 
If we know that there was a round-trip, and we didn't get any data, then we can 
conclude that there was an error (since we know there is data to get). Since we 
can't know that there was a round trip, we weaken the condition to: if we know 
it's been long enough that there should have been a round-trip and we don't get 
data, we conclude there was probably an error.
   
   In your KIP, we specified we would start the task timer _after_ the first 
error, so it seems like we really want to just block the poll for the 
round-trip time, and then apply your "update deadline, etc." function. I'm with 
you now that to get the round-trip time, we have to extract some config(s) from 
the Consumer. This is a pretty awkward hack, but now that I've thought it 
through, it seems the best we can do. Maybe we can mull it over and file an 
improvement jira for the Consumer to improve use cases like this.
   
   Anyway, it seems like the "poll time" config is irrelevant, we just need to 
know what config to grab that corresponds to completing a fetch request with 
high probability. It seems like we shouldn't need to update metadata, so we 
would send a fetch request on the first poll call, and we just need to block 
for whatever time bounds the fetch response time.
   
   I'm honestly not sure what timeout would be best here. It looks like the 
ConsumerNetworkClient will just wait for a response until it gets a 
"disconnect" (L598). Is that a socket timeout? I'm not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to