[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401034#comment-17401034 ]
Sagar Rao commented on KAFKA-13152: ----------------------------------- [~mjsax], [~guozhang] I had a slightly different thought whereby we can provision it as a global setting and then distribute it, similar to how `cache.max.bytes.buffering` works atm. So, assuming we have T Threads and C is the max buffer size, each thread gets C/T bytes. Once the individual StreamThreads have been assigned their respective shares of bytes, then we can look at the total bytes consumed across tasks for that thread and if it exceeds it's share, we can pause the partitions. We can go 1 step further here and further assign each task it's share of bytes(by dividing equally) and finally bytes per partition(similar to the current per partition records config but considering bytes by further equal division) but that's just extra complexity so can be ignored. I think, where assigning C/T bytes among threads might be beneficial as compared to the option suggested by Guozhang here: *instead we just monitor the aggregated total bytes across all tasks within the instance, when it has been reached* is that, let's say there are more than 1 Stream Threads in an instance, and only one of them is exceeding the bounds individually, but because we are looking at the overall bytes count across all tasks, the other Threads would also end up paying the penalty and be paused. If the users provision the config properly, they should be able to pause only relevant tasks and not all. What do you think? Regarding pausing the partitions, i think, it makes sense to pause only those partitions that have some data as you both had discussed for simplicity, Maybe, we can look at heuristics like if there's only one partition which is accounting for say X % of bytes or pick the one with most bytes and pause only those. That might make it more complicated, but lead towards pausing only relevant partitions which is what `buffered.records.per.partition` is able to achieve. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > ----------------------------------------------------------------------- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Sagar Rao > Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in <byte[], byte[]>. -- This message was sent by Atlassian Jira (v8.3.4#803005)