Hi CJ, I recently ran into some kafka message size related issue and did some digging around to understand the system. I will put those details in brief and hope it will help you.
Each consumer connector has fetcher threads and fetcher manager threads associated with it. The Fetcher thread talks to the Kafka brokers and get the data for the consumer. The fetcher thread get the partition information after the re-balance operation. So say each consumer owns N partitions in a topic, and M (M< N) partitions are in Broker i (Broker i is the leader of these partitions) , the fetcher thread sends a request to Broker i for the data. Kafka Protocol is in designed such that the maximum amount of data transferred to one client in a single request should be less that 2GB (2GB also includes the protocol overhead but they are only a few bytes and can be ignored for now). The data requested by fetcher thread is in unit of chunks per partition. Each chunk is of the size of *fetch.message.max.bytes* a parameter in the consumer configuration. Each chunk can have many messages in them. Also if there is a very large message of say 200 MB that needs to be consumed, then the fetch.message.max.bytes should be at least 200MB as in-complete messages are not allowed (ie., one large message cannot be broken into multiple pieces and transferred to the client) The request for data made by the fetcher threads are in chunks of fetch.message.max.bytes and since they are per partition, it is very easy to run into a situation where the total amount of data requested by the fetcher thread crosses 2GB. This results in a situation where the consumer gets no data from the broker. The data transferred from the broker is put in a Blocking Queue <http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html>. The consumer thread will be blocked on the queue till the fetcher thread puts some data in the queue. If you specify a timeout (consumer.timeout.ms) during initialization of the client, the consumer thread will wait for a maximum of consumer.timeout.ms for the data and will throw a Timeout Exception. If consumer.timeout.ms is -1 (default value) then the consumer thread will be blocked till the fetcher queues some data. Kafka supports compression. Compression happens in the producer end and it is decompressed by the consumer. The decompression happens only when the message is processed by the consumer thread and not while getting added to the Blocking queue (ie., decompression is done by the consumer thread and not by fetcher thread). So the *fetch.message.max.bytes *should be the maximum message size after compression. So to circumvent the limitation in Kafka protocol of 2GB per request, we can use kafka compression. So to summarize and to answer your question, there is no way to get a large message with a small *fetch.message.max.bytes.* Thanks, Dinesh On 8 February 2015 at 21:09, Cj <cjwool...@gmail.com> wrote: > > > Hi Kafka team, > > We have a use case where we need to consume from ~20 topics (each with 24 > partitions), we have a potential max message size of 20MB so we've set our > consumer fetch.size to 20MB but that's causing very poor performance on our > consumer (most of our messages are in the 10-100k range). Is it possible to > set the fetch size to a lower number than the max message size and > gracefully handle larger messages (as a trapped exception for example) in > order to improve our throughput? > > Thank you in advance for your help > CJ Woolard