vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452951286



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -359,11 +359,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, 
assignmentErrorCode);
         final AtomicLong nextScheduledRebalanceMs = new 
AtomicLong(Long.MAX_VALUE);
         
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, 
nextScheduledRebalanceMs);
-        String originalReset = null;
-        if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
-            originalReset = (String) 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
-        }

Review comment:
       I had to change this to get the StreamThread test to actually use the 
configured `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG := earliest`
   
   Reading the conditional, it doesn't make any sense to me, but it's been in 
the codebase for a long time, so I'm doubting myself. It seems to say that we 
will only use the provided client configuration if there **is** an override, 
but it seems like it should have been "if there is **not** an override".
   
   Regardless, the "originalReset" is only used as a fallback _after_ we apply 
the builder reset patterns, so I don't see why we should leave it null in any 
case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to