David, I'd look first at ways to speed up the processing downstream of the
consumer, i.e. whatever logic you have writing to HDFS, and in particular
to reduce blocking there, as that is more likely to be the bottleneck than
the consumer itself. Some ideas (that I've had success with):

- turn off auto-commit
- poll() in a tight loop and push records to a BlockingQueue for downstream
processing
- batch and/or pipeline complex transformations, e.g. encoding, compression
- kick off asynchronous tasks for anything that needs to block, e.g.
writing to HDFS
- keep track of your own offsets, i.e. which records have made it safely to
HDFS
- asyncCommit specific offsets periodically

Ryanne

On Tue, Jul 16, 2019 at 1:47 PM David Watzke <da...@watzke.cz> wrote:

> Hi list,
>
> I have a custom kafka consumer app dumping data from various topics to
> HDFS. My kafka cluster consists of 5 physical nodes (56 CPU threads,
> 384G RAM, RAID5s). The consumer is a 28-instance app, 20 consumer
> threads in each instance (all using the same consumer group)
>
> This app reads lots of different topics (about 100 of them), some of
> these topics receive lots of messages while others are less busy.
>
> I've started hitting an issue as some of the topics started receiving
> messages at an even higher rate - according to the metrics, the
> consumption of the busiest topic starts to lag behind and the lag keeps
> increasing during the traffic peak hours.
>
> At this moment I was able to reduce the lag and eventually catch up by
> setting fetch.min.bytes from 40960 back to default 1 and restarting all
> my app instances but I'm pretty sure the problem will come back soon.
>
> I don't think adding more partitions to the topic would solve the
> problem as the topic already has lots of them - there are more consumer
> threads available than the amount of this lagging topic's partitions.
>
> I think I need a way to optimize the consumer somehow, to fetch more
> data at once in each pass, if it's available.
>
> So far I was fiddling with various options such as fetch.min.bytes,
> max.poll.records, max.poll.interval.ms, and
> partition.assignment.strategy but every time there's an actual issue,
> it's kind of a guess-work and trial&error sort of thing.
>
> Do you have any tips for me from an actual production environment with
> huge traffic?
>
> I stumbled upon this nice blogpost:
> https://blog.devcaffeine.com/2016/11/kafka-partition-lag/
>
> In the "One More ‘Why?’" section it talks about the situation I was
> trying to describe:
>
>  > [assuming we have a consumer reading multiple partitions] when
> partition X doesn’t have any messages, it blocks. To avoid heavy,
> constant network chatter, the consumer will block for 10 seconds,
> waiting for the server to accumulate enough messages to fill a chunk.
> If, after the timeout, there are no messages, the consumer shrugs, and
> moves on to the next partition.
>
> However, from the article I am not sure what are the properties I am
> supposed to change. What are the 'timeout' and 'chunk' size properties,
> if I use the terminology from the blogpost?
>
> Thanks in advance for any pointers.
>
> Cheers,
>
> --
> David Watzke
>
>

Reply via email to