vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452951286
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -359,11 +359,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode); final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE); consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs); - String originalReset = null; - if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { - originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); - } Review comment: I had to change this to get the StreamThread test to actually use the configured `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG := earliest` Reading the conditional, it doesn't make any sense to me, but it's been in the codebase for a long time, so I'm doubting myself. It seems to say that we will only use the provided client configuration if there **is** an override, but it seems like it should have been "if there is **not** an override". Regardless, the "originalReset" is only used as a fallback _after_ we apply the builder reset patterns, so I don't see why we should leave it null in any case. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org