guozhangwang commented on a change in pull request #11278: URL: https://github.com/apache/kafka/pull/11278#discussion_r704796291
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -233,6 +234,14 @@ public boolean hasNamedTopologies() { return !builders.containsKey(UNNAMED_TOPOLOGY); } + /** + * @return true iff the app is using named topologies, or was started up with no topology at all + * and the max buffer was set for the named topologies Review comment: This may be related to @ableegoldman 's meta question: do we set `maxBufferSize = true` in the future if one of the named topology has it overridden, or only when all topologies inside the `TopologyMetadata` has this config overridden? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -151,9 +152,26 @@ void handleRebalanceComplete() { releaseLockedUnassignedTaskDirectories(); + if (topologyMetadata.hasNamedTopologies()) { + for (final Task task : tasks.allTasks()) { + tasksTotalMaxBuffer.put( + task.id().topologyName(), + task.maxBuffer() + tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L) Review comment: Not sure I understand this logic: why we add these two values to update the `tasksTotalMaxBuffer`? How would `task.maxBuffer()` be inferred in the future? Since now they are only 0 I cannot tell how would this impact the update logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org