ableegoldman commented on a change in pull request #8913: URL: https://github.com/apache/kafka/pull/8913#discussion_r444368127
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ########## @@ -303,7 +303,8 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO public long lagFor(final TaskId task) { final Long totalLag = taskLagTotals.get(task); if (totalLag == null) { - throw new IllegalStateException("Tried to lookup lag for unknown task " + task); + throw new IllegalStateException("Tried to lookup lag for unknown task: " + task + + " (This exception may be caused by that you don't call KafkaStreams#cleanUp when topology optimization is enabled)"); Review comment: Can we drop this comment? While it's definitely true, it's possible this might be caused by other reasons and I wouldn't want someone to jump the gun and reset their application unnecessarily. If someone hits this, they'll probably open a file or send a question to the mailing list and then we can follow up with them to see if this might be the case ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java ########## @@ -130,6 +130,8 @@ public static void main(final String[] args) throws Exception { } }); + if (streamsProperties.containsKey("streams.cleanup") + && Boolean.parseBoolean(streamsProperties.getProperty("streams.cleanup"))) streams.cleanUp(); Review comment: I think we can just call `cleanUp` here without passing in a flag. We only _need_ to do it the second time around, but it doesn't hurt to just call it every time and it keeps things simple (the system tests are already complicated enough 🙂) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org