Also, I wonder if this issue is related: https://issues.apache.org/jira/browse/KAFKA-3135
-Jay On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps <j...@confluent.io> wrote: > Two things: > 1. Caching data in the processor is a bit dangerous since it will be lost > on failure. Nonetheless, I think you have a point that we should ideally > close the processors first, then commit in case they send any messages on > close. > 2. The issue you describe shouldn't happen for the reason you describe. > Both the broker and the consumer handle batches of messages so fetching a > single 1 MB message versus 1024 1KB messages should be the same. The > proposed max.poll.messages would just effect how many records are handed > out they will have been fetched and be in memory in the consumer no matter > what. I wonder if you could help us trace down what's happening for > you--maybe provide a simple test case that reproduces the problem? > > > On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon < > mdco...@yahoo.com.invalid> wrote: > >> Guozhang, >> Yes, I'm merging message contents into larger messages before sending >> to the producer. We have demonstrated that many tiny messages of < 1K >> causes tremendous slow down on the down stream consumers. Not because of >> memory contention but because of the broker filling up the max fetch >> request size by adding hundreds of thousands of tiny messages to the fetch >> response. The consumer then has to deal with those messages and it causes >> huge latency problems….the broker has to add those hundreds of thousands of >> messages to the response. It takes > 5 seconds per fetch to return from the >> broker in most cases. In contrast, when I merge messages into bundled >> single-messages with larger payloads, we get excellent throughput because >> there is less polling and the number of messages is reduced. >> I'm locked into a battle between fetch size constraints and max >> message size constraints…my max message size can actually spike over 5MB >> for a single message (non-merged) but most of the time it's < 1K. That's >> just the kind of data set we're dealing with. So I can't set fetch size too >> low or one of these larger messages will come in and break the consumer >> from being able to process anything. >> So we either need a way to tell the broker not to fill the max fetch >> size before returning (max.poll.messages) or I need a way to flush to the >> producer when it's about to close my producer. The latter offers the >> benefit of flushing data that may be the results of processing input data >> whose offsets were already committed asynchronously. >> Mike >> >> On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> >> Mike, >> >> Not clear what do you mean by "buffering up the contents". Producer itself >> already did some buffering and batching when sending to Kafka. Did you >> actually "merge" multiple small messages into one large message before >> giving it to the producer in the app code? In either case, I am not sure >> how it will help the downstream consumer memory pressure issue? >> >> About bounding the consumer memory usage, we already have some thoughts >> about that issue and plan to add the memory bounding feature like the >> producer does in the near future ( >> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a >> problem >> for long. And for the "max.poll.messages" config and 0.10.0, just FYI we >> are shooting to have it released end of this month. >> >> Guozhang >> >> >> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon <mdco...@yahoo.com.invalid >> > >> wrote: >> >> > Guozhang, >> > In my processor, I'm buffering up contents of the final messages in >> > order to make them larger. This is to optimize throughput and avoid tiny >> > messages from being injected downstream. So nothing is being pushed to >> the >> > producer until my configured thresholds are met in the buffering >> mechanism. >> > So as it stands, these messages are left dangling after the producer >> closes >> > and, even worse, if periodic commits are happening behind the scenes, >> the >> > data is lost on restart. >> > What we need is a way to notify the processors that everything is >> > "about" to close so that I can properly flush what I have in memory out >> to >> > the producer. Otherwise, I'm stuck with always sending tiny messages >> into >> > kafka--which I know for certain causes problems on down stream consumers >> > (where they set a high fetch memory size and it causes hundreds of >> > thousands of messages to be retrieved at a time…and thus bogs down the >> > consumer). I think the "max.poll.messages" setting we discussed before >> > would help here but if it's not available until 0.10, I'm kind of stuck. >> > Another option might be to disable periodic commits and only commit >> > when the processor requests it. This would mitigate some data loss and >> is >> > better than nothing. There is still a chance that data in RecordQueue >> not >> > yet sent to my processor would be committed but never processed in this >> > case. >> > Another thought I had was to reduce the max fetch size; however, some >> > messages can be very large (i.e. data spikes periodically). In this >> case, >> > the messages size would exceed my lower max fetch size causing the >> consumer >> > to simply stop consuming. So I'm stuck. So either we need to roll in the >> > max.poll.messages sooner than 0.10 or maybe a callback mechanism >> letting me >> > know that the producer is about to close so I can clear my buffers. >> > Ideas? >> > Mike >> > >> > On Friday, April 8, 2016 8:24 PM, Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > >> > >> > Hi Michael, >> > >> > When you call KafkaStreams.close(), it will first trigger a commitAll() >> > function, which will 1) flush local state store if necessary; 2) flush >> > messages buffered in producer; 3) commit offsets on consumer. Then it >> will >> > close the producer / consumer clients and shutdown the tasks. So when >> you >> > see processor's "close" function triggered, any buffered messages in the >> > producer should already been flushed. >> > >> > Did you see a different behavior than the above described? >> > >> > Guozhang >> > >> > >> > On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon >> <mdco...@yahoo.com.invalid >> > > >> > wrote: >> > >> > > All, >> > > I'm seeing my processor's "close" method being called AFTER my >> > > downstream producer has been closed. I had assumed that on close I >> would >> > be >> > > able to flush whatever I had been buffering up to send to kafka >> topic. In >> > > other words, we've seen significant performance differences in >> building >> > > flows with small messages and large messages in/out of kafka. So my >> > > processor buffers up messages to a threshold and flushes those as a >> > > composite message bundle to improve downstream processing. But if this >> > > close method is called AFTER the producer has already been closed, I >> > would >> > > have no way to actually flush the final composite bundles to my topic >> on >> > > shutdown. Is there some way to get a call BEFORE producer shutdown >> > occurs? >> > > Mike >> > > >> > > >> > >> > >> > -- >> > -- Guozhang >> > >> > >> > >> > >> >> >> >> -- >> -- Guozhang >> >> >> > >