[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400647#comment-17400647 ]
Matthias J. Sax commented on KAFKA-13152: ----------------------------------------- For (1), I think we need to take time-synchronization into account. If a task has multiple partitions, and some are empty, we might delay processing base on `task.max.idle.ms` config – however, we should always allow to fetch for the partitions of empty buffers and never pause them; otherwise, we would sacrifice ordered processing and I think a tradeoff between semantics and "buffer size" would not be a good one? We could even end up in a "temporal deadlock": no task is processable as it has at least one empty buffer, but all partitions are paused because we exceeded the max-buffer-space – the deadlock is temporal, because we would go into "forced processing" after `task.max.idle.ms` passed though – or we need to change the behavior and go into "forced processed" right away for this case without waiting for `max.task.idle.ms` (but it might be desirable to ignore `task.max.idle.ms`). Another question I have is about "balanced fetching": atm, we use the same buffer space for each partition and pause a single partition if its buffer space is exceeded. If we follow (1), could it happen that some partitions buffer much more data than others, and could this become a "fairness" problem? In the end, I agree that not have the exact same buffer space across all partitions can be beneficial: a high volume topic might be better off using more buffer space than a low volume topic. However, I am wonder if would still need some bounds to avoid that we go from the current extreme to give the exact same buffer space per partitions, to the opposite extreme for which some partitions might "starve" as their buffer space becomes too small? > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > ----------------------------------------------------------------------- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Sagar Rao > Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in <byte[], byte[]>. -- This message was sent by Atlassian Jira (v8.3.4#803005)