[ https://issues.apache.org/jira/browse/KAFKA-6088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma resolved KAFKA-6088. -------------------------------- Resolution: Fixed Since it was fixed in 0.11.0.0, marking it as fixed. > Kafka Consumer slows down when reading from highly compacted topics > ------------------------------------------------------------------- > > Key: KAFKA-6088 > URL: https://issues.apache.org/jira/browse/KAFKA-6088 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.2.1 > Reporter: James Cheng > Fix For: 0.11.0.0 > > > Summary of the issue > ----- > We found a performance issue with the Kafka Consumer where it gets less > efficient if you have frequent gaps in offsets (which happens when there is > lots of compaction on the topic). > The issue is present in 0.10.2.1 and possibly prior. > It is fixed in 0.11.0.0. > Summary of cause > ----- > The fetcher code assumes that there will be no gaps in message offsets. If > there are, it does an additional round trip to the broker. For topics with > large gaps in offsets, it is possible that most calls to {{poll()}} will > generate a roundtrip to the broker. > Background and details > ----- > We have a topic with roughly 8 million records. The topic is log compacted. > It turns out that most of the initial records in the topic were never > overwritten, whereas in the 2nd half of the topic we had lots of overwritten > records. That means that for the first part of the topic, there are no gaps > in offsets. But in the 2nd part of the topic, there are frequent gaps in the > offsets (due to records being compacted away). > We have a consumer that starts up and reads the entire topic from beginning > to end. We noticed that the consumer would read through the first part of the > topic very quickly. When it got to the part of the topic with frequent gaps > in offsets, consumption rate slowed down dramatically. This slowdown was > consistent across multiple runs. > What is happening is this: > 1) A call to {{poll()}} happens. The consumer goes to the broker and returns > 1MB of data (the default of {{max.partition.fetch.bytes}}). It then returns > to the caller just 500 records (the default of {{max.poll.records}}), and > keeps the rest of the data in memory to use in future calls to {{poll()}}. > 2) Before returning the 500 records, the consumer library records the *next* > offset it should return. It does so by taking the offset of the last record, > and adds 1 to it. (The offset of the 500th message from the set, plus 1). It > calls this the {{nextOffset}} > 3) The application finishes processing the 500 messages, and makes another > call to {{poll()}} happens. During this call, the consumer library does a > sanity check. It checks that the first message of the set *it is about to > return* has an offset that matches the value of {{nextOffset}}. That is it > checks if the 501th record has an offset that is 1 greater than the 500th > record. > a. If it matches, then it returns an additional 500 records, and > increments the {{nextOffset}} to (offset of the 1000th record, plus 1) > b. If it doesn't match, then it throws away the remainder of the 1MB of > data that it stored in memory in step 1, and it goes back to the broker to > fetch an additional 1MB of data, starting at the offset {{nextOffset}}. > In topics have no gaps (a non-compacted topic), then the code will always hit > the 3a code path. > If the topic has gaps in offsets and the call to {{poll()}} happens to fall > onto a gap, then the code will hit code path 3b. > If the gaps are frequent, then it will frequently hit code path 3b. > The worst case scenario that can happen is if you have a large number of > gaps, and you run with {{max.poll.records=1}}. Every gap will result in a new > fetch to the broker. You may possibly end up only processing one message per > fetch. Or, said another way, you will end up doing a single fetch for every > single message in the partition. > Repro > ----- > We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in > 0.11. I've attached the tarball with all the code and instructions. > The repro is: > 1) Create a single partition topic with log compaction turned on > 2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each > message key written twice in a row) > 3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... > would be compacted away > 4) Consume from this topic with {{max.poll.records=1}} > More concretely, > Here is the producer code: > {code} > Producer<String, String> producer = new KafkaProducer<String, String>(props); > for (int i = 0; i < 1000000; i++) { > producer.send(new ProducerRecord<String, String>("compacted", > Integer.toString(i), Integer.toString(i))); > producer.send(new ProducerRecord<String, String>("compacted", > Integer.toString(i), Integer.toString(i))); > } > producer.flush(); > producer.close(); > {code} > When consuming with a 0.10.2.1 consumer, you can see this pattern (with > Fetcher logs at DEBUG, see file consumer_0.10.2/debug.log): > {code} > offset = 1, key = 0, value = 0 > 22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 3 since the current position is 2 > 22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 3, key = 1, value = 1 > 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 5 since the current position is 4 > 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 5, key = 2, value = 2 > 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 7 since the current position is 6 > 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 7, key = 3, value = 3 > 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 9 since the current position is 8 > 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 9, key = 4, value = 4 > 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 11 since the current position is 10 > 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 11, key = 5, value = 5 > 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 13 since the current position is 12 > 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > offset = 13, key = 6, value = 6 > 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring > fetched records for compacted-0 at offset 15 since the current position is 14 > 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch > for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) > {code} > When consuming with a 0.11.0.1 consumer ,you can see the following pattern: > (see file consumer_0.11/debug.log): > {code} > offset = 1, key = 0, value = 0 > offset = 3, key = 1, value = 1 > offset = 5, key = 2, value = 2 > offset = 7, key = 3, value = 3 > offset = 9, key = 4, value = 4 > offset = 11, key = 5, value = 5 > offset = 13, key = 6, value = 6 > offset = 15, key = 7, value = 7 > offset = 17, key = 8, value = 8 > offset = 19, key = 9, value = 9 > offset = 21, key = 10, value = 10 > {code} > From looking at the github history, it appears it was fixed in > https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0 > Specifically, this line > https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930 > Was replaced by this line: > https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933 > Mitigation > ----- > This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you > will not be affected by the problem. > If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by > increasing the value of {{max.poll.records}}. This works because check > happens on each call to {{poll()}}, and increasing the value of > {{max.poll.records}} will reduce the number of calls to {{poll()}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)