[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401302#comment-17401302 ]
Matthias J. Sax commented on KAFKA-13152: ----------------------------------------- [~guozhang] – your proposal to only pause partitions with non-empty buffers SGTM; about fairness, I was not sure if we can/should rely on the consumer, but if you think it's the right way to go, I am ok with it. Just wanted to raise the question to ensure that we make a conscious decision. [~sagarrao] – I think we should take the discussion into the KIP? It seem the scope is clear now, and we have two proposal: Dividing the given buffer size across thread (or maybe even task etc) or follow Guozhang's proposal. It seems your concern is similar to my concern about fairness. Guozhang pointed out the we should achieve fairness within a thread (due to consumer round robin fetching) but I guess your point is a good one, that it's unclear if we achieve fairness across threads? [~guozhang] WDYT about this? In the end the question seems to be, if we can/should try to keep it simple vs. how complex we want to design the algorithm. Personally, I am afraid of pre-mature optimization and think keeping it simple might be the better way to get started. It might be best if you start to work on a KIP, and explain pros/cons of both approaches, and put one into "rejected alternatives" sections and we can discuss on the mailing list? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > ----------------------------------------------------------------------- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Sagar Rao > Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in <byte[], byte[]>. -- This message was sent by Atlassian Jira (v8.3.4#803005)