Hi all, we've just had a case where we suspect that messages get delayed from being consumed under certain circumstances. I don't necessarily think this is a bug, hence have not opened a jira yet but wanted to discuss here - there's probably a best practice that I just don't know about.
The scenario is having one consumer that is subscribed to a large number of partitions, some of which are very busy and some of which only receive messages sporadically. When the consumer now sends a fetchrequest for all subscribed partitions the broker starts filling these partition by partition while honoring two parameters: max.partition.fetch.bytes - controls the maximum size of the data that is returned for one individual partition - default: 1 * 1024 * 1024 = 1048576 bytes fetch.max.bytes - controls the overall maximum size of data that is returned for the entire fetchrequest - default: 50 * 1024 * 1024 = 52428800 bytes So by default a fetchresponse can contain data from a maximum of 50 partitions, which creates the possibility of "freezing out" partitions if there are a lot of busy partitions in the subscriptions. I've created a small test for this to illustrate my concern: Topics: 000 - 1 partition - 1 message aaa - 100 partitions - 10 Mill. messages bbb - 1000 partitions - 50 Mill. messages mmm - 1 partition - 1 message zzz - 100 partitions - 10 Mill messages When I consume from these with default settings and simply print the time I first receive a message from a topic I get the following: Got first record from topic aaa after 747 ms Got first record from topic bbb after 2764 ms Got first record from topic zzz after 15068 ms Got first record from topic 000 after 16588 ms Got first record from topic mmm after 16588 ms So as we can see the topics with only one partition get stuck behind the larger topics with data to be read. I am unsure in what order the broker iterates over the partitions, but I've always seen the same general order in the output, so there seems to be some factor influencing this. One potential fix that I identified was to reduce the max.partition.fetch.bytes parameter, so that more partitions can be included in a fetchresponse. If I rerun the test with a value of 1024 I get: Got first record from topic aaa after 5446 ms Got first record from topic bbb after 5469 ms Got first record from topic zzz after 5744 ms Got first record from topic mmm after 5762 ms Got first record from topic 000 after 5762 ms Which looks much better, but I have doubts whether this is the actual solution as this could lead to an increase in the number of fetch requests that are being sent, when only a few partitions have new data: 5 Partitions with 10mb of new data each would fit in 10 requests with default settings, but need 10240 with my adjusted settings. This topic is currently also being discussed in the thread on KIP-349 [1] but consensus seems to be that there is no real need for a feature like this. Are there common patterns to get around this? The obvious solution would be scaling the load across more consumers of course, either by adding them to the consumer group or by splitting the topics over consumers, but that sort of just makes it a question of scale until it may happen again. Would it potentially be worthwhile looking into code changes to improve handling for these edge cases? Keeping track of the last time partitions returned data for a consumer group and prioritizing "oldest" partitions for example. This would need memory on the broker though which might turn out to be quite a lot since it would scale with partition count and consumer groups. Alternatively some sort of feedback to the consumer could be added about partitions that were not checked due to the limits, but that would need a wire protocol change. Perhaps a little consumer side logic that starts fragmenting fetch requests if it notices that responses always have data from the maximum number of partitions. Best regards, Sönke [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics