guozhangwang commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r704796291



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -233,6 +234,14 @@ public boolean hasNamedTopologies() {
         return !builders.containsKey(UNNAMED_TOPOLOGY);
     }
 
+    /**
+     * @return true iff the app is using named topologies, or was started up 
with no topology at all
+     * and the max buffer was set for the named topologies

Review comment:
       This may be related to @ableegoldman 's meta question: do we set 
`maxBufferSize = true` in the future if one of the named topology has it 
overridden, or only when all topologies inside the `TopologyMetadata` has this 
config overridden?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -151,9 +152,26 @@ void handleRebalanceComplete() {
 
         releaseLockedUnassignedTaskDirectories();
 
+        if (topologyMetadata.hasNamedTopologies()) {
+            for (final Task task : tasks.allTasks()) {
+                tasksTotalMaxBuffer.put(
+                    task.id().topologyName(),
+                    task.maxBuffer() + 
tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L)

Review comment:
       Not sure I understand this logic: why we add these two values to update 
the `tasksTotalMaxBuffer`? How would `task.maxBuffer()` be inferred in the 
future? Since now they are only 0 I cannot tell how would this impact the 
update logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to