[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400647#comment-17400647
 ] 

Matthias J. Sax commented on KAFKA-13152:
-----------------------------------------

For (1), I think we need to take time-synchronization into account. If a task 
has multiple partitions, and some are empty, we might delay processing base on 
`task.max.idle.ms` config – however, we should always allow to fetch for the 
partitions of empty buffers and never pause them; otherwise, we would sacrifice 
ordered processing and I think a tradeoff between semantics and "buffer size" 
would not be a good one? We could even end up in a "temporal deadlock": no task 
is processable as it has at least one empty buffer, but all partitions are 
paused because we exceeded the max-buffer-space – the deadlock is temporal, 
because we would go into "forced processing" after `task.max.idle.ms` passed 
though – or we need to change the behavior and go into "forced processed" right 
away for this case without waiting for `max.task.idle.ms` (but it might be 
desirable to ignore `task.max.idle.ms`).

Another question I have is about "balanced fetching": atm, we use the same 
buffer space for each partition and pause a single partition if its buffer 
space is exceeded. If we follow (1), could it happen that some partitions 
buffer much more data than others, and could this become a "fairness" problem? 
In the end, I agree that not have the exact same buffer space across all 
partitions can be beneficial: a high volume topic might be better off using 
more buffer space than a low volume topic. However, I am wonder if would still 
need some bounds to avoid that we go from the current extreme to give the exact 
same buffer space per partitions, to the opposite extreme for which some 
partitions might "starve" as their buffer space becomes too small?

 

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-13152
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13152
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Sagar Rao
>            Priority: Major
>              Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in <byte[], byte[]>.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to