mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451895176



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, 
Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
             }
             task.closeDirty();
+            // Pause so we won't poll any more records for this task until it 
has been re-initialized
+            // Note, closeDirty already clears the partitiongroup for the task.
+            mainConsumer().pause(task.inputPartitions());
+            final Map<TopicPartition, OffsetAndMetadata> committed = 
mainConsumer().committed(task.inputPartitions());
+            for (final TopicPartition topicPartition : task.inputPartitions()) 
{
+                final OffsetAndMetadata offsetAndMetadata = 
committed.get(topicPartition);
+                if (offsetAndMetadata == null) {
+                    final OffsetResetStrategy strategy = 
resetStrategy.apply(topicPartition);
+                    switch (strategy) {
+                        case EARLIEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        case LATEST:
+                            
mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unexpected 
reset strategy: " + strategy);
+                    }
+                } else {
+                    mainConsumer().seek(topicPartition, offsetAndMetadata);
+                }
+            }

Review comment:
       Why do you think is bad? That is just how the API works... Cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802
 that does the same thing.
   
   What make we wonder, if we can share common code for both cases?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -137,7 +138,7 @@
         STARTING(2, 3, 5),                // 1
         PARTITIONS_REVOKED(2, 3, 5),      // 2
         PARTITIONS_ASSIGNED(2, 3, 4, 5),  // 3
-        RUNNING(2, 3, 5),                 // 4
+        RUNNING(2, 3, 4, 5),              // 4

Review comment:
       Why do we need this?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
         return records;
     }
 
+    private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {
+        if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.EARLIEST;
+        } else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.LATEST;
+        } else {
+            if (originalReset == null || (!originalReset.equals("earliest") && 
!originalReset.equals("latest"))) {
+                return OffsetResetStrategy.EARLIEST;
+            }

Review comment:
       > Does Streams override the client default..?
   
   Yes. The client default is "latest" but we use "earliest" by default (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L857).
 Of course, users can also change the default via StreamsConfig.
   
   Note that the consumer client can only apply a single strategy to all topics 
it subscribed to. Hence, if all topics use the same reset policy, we can rely 
on the consumer configures policy. However, if users specify different reset 
policies in their code via `Consumed` for individual topics, the consumer is 
re-configured to use "none" (cf. 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L362-L366)
 and we do a manual seekToBeginning/seekToEnd according to the user define 
strategy for the corresponding topic (cf 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L762-L764)
 because we need to make a per-topic decision that the consumer cannot make for 
us.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
         return records;
     }
 
+    private OffsetResetStrategy getResetStrategy(final TopicPartition 
partition) {

Review comment:
       Wondering if we should reuse this method within 
`StreamThread#resetInvalidOffsets`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1140,4 +1166,12 @@ public static void executeAndMaybeSwallow(final boolean 
clean,
             throw e; },
             e -> log.debug("Ignoring error in unclean {}", name));
     }
+
+    boolean hasPreRunningTasks() {
+        return tasks().values().stream().anyMatch(Task::preRunning);
+    }
+
+    public void setResetStrategy(final Function<TopicPartition, 
OffsetResetStrategy> resetStrategy) {

Review comment:
       I think the name of the method is not ideal. We don't set the strategy, 
but we set a function that can compute the strategy for a partitions. Needed to 
go forth and back on the PR to understand how it work, and assume the method 
name had its part in confusing me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to