[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401034#comment-17401034
 ] 

Sagar Rao commented on KAFKA-13152:
-----------------------------------

[~mjsax], [~guozhang] I had a slightly different thought whereby we can 
provision it as a global setting and then distribute it, similar to how 
`cache.max.bytes.buffering` works atm. 

So, assuming we have T Threads and C is the max buffer size, each thread gets 
C/T bytes.

Once the individual StreamThreads have been assigned their respective shares of 
bytes, then we can look at the total bytes consumed across tasks for that 
thread and if it exceeds it's share, we can pause the partitions.

We can go 1 step further here and further assign each task it's share of 
bytes(by dividing equally) and finally bytes per partition(similar to the 
current per partition records config but considering bytes by further equal 
division) but that's just extra complexity so can be ignored.

I think, where assigning C/T bytes among threads might be beneficial as 
compared to the option suggested by Guozhang here: 

 *instead we just monitor the aggregated total bytes across all tasks within 
the instance, when it has been reached* 

is that, let's say there are more than 1 Stream Threads in an instance, and 
only one of them is exceeding the bounds individually, but because we are 
looking at the overall bytes count across all tasks, the other Threads would 
also end up paying the penalty and be paused. If the users provision the config 
properly, they should be able to pause only relevant tasks and not all. What do 
you think?

Regarding pausing the partitions, i think, it makes sense to pause only those 
partitions that have some data as you both had discussed for simplicity, Maybe, 
we can look at heuristics like if there's only one partition which is 
accounting for say X % of bytes or pick the one with most bytes and pause only 
those. That might make it more complicated, but lead towards pausing only 
relevant partitions which is what `buffered.records.per.partition` is able to 
achieve.

 

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-13152
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13152
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Sagar Rao
>            Priority: Major
>              Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in <byte[], byte[]>.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to