Hi again,
For the record I filed an issue about this here: https://issues.apache.org/jira/browse/KAFKA-2986 Cheers, Jens – Skickat från Mailbox On Fri, Dec 11, 2015 at 7:56 PM, Jens Rantil <jens.ran...@tink.se> wrote: > Hi, > We've been experimenting a little with running Kafka internally for better > handling temporary throughput peaks of asynchronous tasks. However, we've > had a big issues making Kafka work for us and I am starting to question > whether its a good fit. > Our usecase: > - High latency. At peaks, each consumer requires ~20 seconds to handle a > single message/task. > - Extreme variation in message size: Serialized tasks are in the range > of ~300 bytes up to ~3 MB. > - Generally, it is processed in 20 seconds independent of message size. > Most messages are small. > Our setup: > - Kafka 0.9.0. > - Using the new Java consumer API (consumer groups etc.). > - To occasionally handle large (3 MB) messages we've had to set the > following configuration parameters: > - max.partition.fetch.bytes=10485760=10MB on consumer to handle > larger messages. > - session.timeout.ms=30s to handle our high latency processing. > - replica.fetch.max.bytes=10485760=10MB on broker. > - message.max.bytes=10485760=10MB on broker. > Sample code: > while (isRunning()) { > ConsumerRecords<String, byte[]> records = consumer.poll(100); > for (final ConsumerRecord<String, byte[]> record : records) { > // Handle record... > } > } > AFAIK this seem like a very basic consumer code. > Initial problem: When doing load testing to simulate peaks our consumer > started spinning infinitely in similar fashion to [1]. We also noticed that > we consistently were seeing [2] in our broker log. > [1] http://bit.ly/1Q7zxgh > [2] https://gist.github.com/JensRantil/2d1e7db3bd919eb35f9b > Root cause analysis: AFAIK, health checks are only submitted to Kafka when > calling `consumer.poll(...)`. To handle larger messages, we needed to > increase max.partition.fetch.bytes. However, due to our high latency > consumer a large amounts of small messages could be prefetched which made > our inner for loop run long enough for the broker to consider our consumer > dead. > Two questions: > - Is there any workaround to avoid the broker thinking our consumer is > dead? Increasing session timeout to handle the polling interval for small > messages is not an option since we simply prefetch too many messages for > that to be an option. Can we set a limit on how many messages Kafka > prefetches? Or is there a way to send health checks to broker out of bands > without invoking the `KafkaConsumer#poll` method? > - Is Kafka a bad tool for our usecase? > Thanks and have a nice weekend, > Jens > -- > Jens Rantil > Backend engineer > Tink AB > Email: jens.ran...@tink.se > Phone: +46 708 84 18 32 > Web: www.tink.se > Facebook <https://www.facebook.com/#!/tink.se> Linkedin > <http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary> > Twitter <https://twitter.com/tink>