After some further research I think I answered at least part of this myself.

KIP-74 [1] states the following about ordering of partitions in the
fetch request:
"The solution is to reorder partitions in fetch request in round-robin
fashion to continue fetching from first empty partition received or to
perform random shuffle of partitions before each request."

Which explains the delay in my listing until data is read from the
topics with only one partition. Initially both these topics are
fetched last and then they move forward in every subsequent fetch
request until at some point they are among the first 50 (if run with
default settings for max.partition.fetch.bytes and fetch.max.bytes and
assume all partitions contain enough data to satisfy
max.partition.fetch.bytes) and receive data. In my test scenario this
takes 24 fetch requests. To better illustrate this we can set
max.fetch.bytes = max.partition.fetch.bytes which causes every fetch
request to only contain data from one partition.

The consumer logs the order at debug level, so we can check progress
in the output:

2019-01-11 14:10:52 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[aaa-33, aaa-34, aaa-35, ... , zzz-90, zzz-24, mmm-0, 000-0] to broker
localhost:9092 (id: 0 rack: null)
2019-01-11 14:10:52 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[aaa-34, aaa-35, aaa-36, ... , zzz-24, mmm-0, 000-0, aaa-33] to broker
localhost:9092 (id: 0 rack: null)
2019-01-11 14:10:53 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[aaa-35, aaa-36, aaa-37, ... , mmm-0, 000-0, aaa-33, aaa-34] to broker
localhost:9092 (id: 0 rack: null)
...
2019-01-11 14:12:58 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[zzz-90, zzz-24, mmm-0, 000-0, ... , zzz-88, zzz-22, zzz-89, zzz-23]
to broker localhost:9092 (id: 0 rack: null)
2019-01-11 14:12:58 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[zzz-24, mmm-0, 000-0, aaa-33, ... , zzz-22, zzz-89, zzz-23, zzz-90]
to broker localhost:9092 (id: 0 rack: null)
2019-01-11 14:12:58 DEBUG Fetcher:195 - [Consumer clientId=consumer-1,
groupId=cg1547212251445] Sending READ_UNCOMMITTED fetch for partitions
[mmm-0, 000-0, aaa-33, aaa-34, ... , zzz-89, zzz-23, zzz-90, zzz-24]
to broker localhost:9092 (id: 0 rack: null)

So I'll withdraw my suggestions around code improvements, as this is
already being handled well. The question around best practice for
handling something like this remains though. If anybody has any
suggestions I'd love to hear them!

Best regards,
Sönke

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

On Wed, Jan 9, 2019 at 4:35 PM Sönke Liebau <soenke.lie...@opencore.com> wrote:
>
> Hi all,
>
> we've just had a case where we suspect that messages get delayed from
> being consumed under certain circumstances. I don't necessarily think
> this is a bug, hence have not opened a jira yet but wanted to discuss
> here - there's probably a best practice that I just don't know about.
>
> The scenario is having one consumer that is subscribed to a large
> number of partitions, some of which are very busy and some of which
> only receive messages sporadically. When the consumer now sends a
> fetchrequest for all subscribed partitions the broker starts filling
> these partition by partition while honoring two parameters:
>
> max.partition.fetch.bytes - controls the maximum size of the data that
> is returned for one individual partition - default: 1 * 1024 * 1024 =
> 1048576 bytes
> fetch.max.bytes - controls the overall maximum size of data that is
> returned for the entire fetchrequest - default: 50 * 1024 * 1024 =
> 52428800 bytes
>
> So by default a fetchresponse can contain data from a maximum of 50
> partitions, which creates the possibility of "freezing out" partitions
> if there are a lot of busy partitions in the subscriptions.
> I've created a small test for this to illustrate my concern:
>
> Topics:
> 000 - 1 partition - 1 message
> aaa - 100 partitions - 10 Mill. messages
> bbb - 1000 partitions - 50 Mill. messages
> mmm - 1 partition - 1 message
> zzz - 100 partitions - 10 Mill messages
>
> When I consume from these with default settings and simply print the
> time I first receive a message from a topic I get the following:
> Got first record from topic aaa after 747 ms
> Got first record from topic bbb after 2764 ms
> Got first record from topic zzz after 15068 ms
> Got first record from topic 000 after 16588 ms
> Got first record from topic mmm after 16588 ms
>
> So as we can see the topics with only one partition get stuck behind
> the larger topics with data to be read. I am unsure in what order the
> broker iterates over the partitions, but I've always seen the same
> general order in the output, so there seems to be some factor
> influencing this.
> One potential fix that I identified was to reduce the
> max.partition.fetch.bytes parameter, so that more partitions can be
> included in a fetchresponse. If I rerun the test with a value of 1024
> I get:
>
> Got first record from topic aaa after 5446 ms
> Got first record from topic bbb after 5469 ms
> Got first record from topic zzz after 5744 ms
> Got first record from topic mmm after 5762 ms
> Got first record from topic 000 after 5762 ms
>
> Which looks much better, but I have doubts whether this is the actual
> solution as this could lead to an increase in the number of fetch
> requests that are being sent, when only a few partitions have new
> data:
> 5 Partitions with 10mb of new data each would fit in 10 requests with
> default settings, but need 10240 with my adjusted settings.
>
> This topic is currently also being discussed in the thread on KIP-349
> [1] but consensus seems to be that there is no real need for a feature
> like this.
>
> Are there common patterns to get around this? The obvious solution
> would be scaling the load across more consumers of course, either by
> adding them to the consumer group or by splitting the topics over
> consumers, but that sort of just makes it a question of scale until it
> may happen again.
>
> Would it potentially be worthwhile looking into code changes to
> improve handling for these edge cases?
> Keeping track of the last time partitions returned data for a consumer
> group and prioritizing "oldest" partitions for example. This would
> need memory on the broker though which might turn out to be quite a
> lot since it would scale with partition count and consumer groups.
> Alternatively some sort of feedback to the consumer could be added
> about partitions that were not checked due to the limits, but that
> would need a wire protocol change.
> Perhaps a little consumer side logic that starts fragmenting fetch
> requests if it notices that responses always have data from the
> maximum number of partitions.
>
> Best regards,
> Sönke
>
> [1] 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics



-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

Reply via email to