Also, I wonder if this issue is related:
https://issues.apache.org/jira/browse/KAFKA-3135

-Jay

On Sun, Apr 10, 2016 at 8:58 AM, Jay Kreps <j...@confluent.io> wrote:

> Two things:
> 1. Caching data in the processor is a bit dangerous since it will be lost
> on failure. Nonetheless, I think you have a point that we should ideally
> close the processors first, then commit in case they send any messages on
> close.
> 2. The issue you describe shouldn't happen for the reason you describe.
> Both the broker and the consumer handle batches of messages so fetching a
> single 1 MB message versus 1024 1KB messages should be the same. The
> proposed max.poll.messages would just effect how many records are handed
> out they will have been fetched and be in memory in the consumer no matter
> what. I wonder if you could help us trace down what's happening for
> you--maybe provide a simple test case that reproduces the problem?
>
>
> On Sun, Apr 10, 2016 at 6:13 AM, Michael D. Coon <
> mdco...@yahoo.com.invalid> wrote:
>
>> Guozhang,
>>    Yes, I'm merging message contents into larger messages before sending
>> to the producer. We have demonstrated that many tiny messages of < 1K
>> causes tremendous slow down on the down stream consumers. Not because of
>> memory contention but because of the broker filling up the max fetch
>> request size by adding hundreds of thousands of tiny messages to the fetch
>> response. The consumer then has to deal with those messages and it causes
>> huge latency problems….the broker has to add those hundreds of thousands of
>> messages to the response. It takes > 5 seconds per fetch to return from the
>> broker in most cases. In contrast, when I merge messages into bundled
>> single-messages with larger payloads, we get excellent throughput because
>> there is less polling and the number of messages is reduced.
>>    I'm locked into a battle between fetch size constraints and max
>> message size constraints…my max message size can actually spike over 5MB
>> for a single message (non-merged) but most of the time it's < 1K. That's
>> just the kind of data set we're dealing with. So I can't set fetch size too
>> low or one of these larger messages will come in and break the consumer
>> from being able to process anything.
>>    So we either need a way to tell the broker not to fill the max fetch
>> size before returning (max.poll.messages) or I need a way to flush to the
>> producer when it's about to close my producer. The latter offers the
>> benefit of flushing data that may be the results of processing input data
>> whose offsets were already committed asynchronously.
>> Mike
>>
>>     On Saturday, April 9, 2016 2:27 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>>
>>  Mike,
>>
>> Not clear what do you mean by "buffering up the contents". Producer itself
>> already did some buffering and batching when sending to Kafka. Did you
>> actually "merge" multiple small messages into one large message before
>> giving it to the producer in the app code? In either case, I am not sure
>> how it will help the downstream consumer memory pressure issue?
>>
>> About bounding the consumer memory usage, we already have some thoughts
>> about that issue and plan to add the memory bounding feature like the
>> producer does in the near future (
>> https://issues.apache.org/jira/browse/KAFKA-2045), so it won't be a
>> problem
>> for long. And for the "max.poll.messages" config and 0.10.0, just FYI we
>> are shooting to have it released end of this month.
>>
>> Guozhang
>>
>>
>> On Sat, Apr 9, 2016 at 5:59 AM, Michael D. Coon <mdco...@yahoo.com.invalid
>> >
>> wrote:
>>
>> > Guozhang,
>> >    In my processor, I'm buffering up contents of the final messages in
>> > order to make them larger. This is to optimize throughput and avoid tiny
>> > messages from being injected downstream. So nothing is being pushed to
>> the
>> > producer until my configured thresholds are met in the buffering
>> mechanism.
>> > So as it stands, these messages are left dangling after the producer
>> closes
>> > and, even worse, if periodic commits are happening behind the scenes,
>> the
>> > data is lost on restart.
>> >    What we need is a way to notify the processors that everything is
>> > "about" to close so that I can properly flush what I have in memory out
>> to
>> > the producer. Otherwise, I'm stuck with always sending tiny messages
>> into
>> > kafka--which I know for certain causes problems on down stream consumers
>> > (where they set a high fetch memory size and it causes hundreds of
>> > thousands of messages to be retrieved at a time…and thus bogs down the
>> > consumer). I think the "max.poll.messages" setting we discussed before
>> > would help here but if it's not available until 0.10, I'm kind of stuck.
>> >    Another option might be to disable periodic commits and only commit
>> > when the processor requests it. This would mitigate some data loss and
>> is
>> > better than nothing. There is still a chance that data in RecordQueue
>> not
>> > yet sent to my processor would be committed but never processed in this
>> > case.
>> >    Another thought I had was to reduce the max fetch size; however, some
>> > messages can be very large (i.e. data spikes periodically). In this
>> case,
>> > the messages size would exceed my lower max fetch size causing the
>> consumer
>> > to simply stop consuming. So I'm stuck. So either we need to roll in the
>> > max.poll.messages sooner than 0.10 or maybe a callback mechanism
>> letting me
>> > know that the producer is about to close so I can clear my buffers.
>> >    Ideas?
>> > Mike
>> >
>> >    On Friday, April 8, 2016 8:24 PM, Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> >
>> >
>> >  Hi Michael,
>> >
>> > When you call KafkaStreams.close(), it will first trigger a commitAll()
>> > function, which will 1) flush local state store if necessary; 2) flush
>> > messages buffered in producer; 3) commit offsets on consumer. Then it
>> will
>> > close the producer / consumer clients and shutdown the tasks. So when
>> you
>> > see processor's "close" function triggered, any buffered messages in the
>> > producer should already been flushed.
>> >
>> > Did you see a different behavior than the above described?
>> >
>> > Guozhang
>> >
>> >
>> > On Fri, Apr 8, 2016 at 12:23 PM, Michael D. Coon
>> <mdco...@yahoo.com.invalid
>> > >
>> > wrote:
>> >
>> > > All,
>> > >    I'm seeing my processor's "close" method being called AFTER my
>> > > downstream producer has been closed. I had assumed that on close I
>> would
>> > be
>> > > able to flush whatever I had been buffering up to send to kafka
>> topic. In
>> > > other words, we've seen significant performance differences in
>> building
>> > > flows with small messages and large messages in/out of kafka. So my
>> > > processor buffers up messages to a threshold and flushes those as a
>> > > composite message bundle to improve downstream processing. But if this
>> > > close method is called AFTER the producer has already been closed, I
>> > would
>> > > have no way to actually flush the final composite bundles to my topic
>> on
>> > > shutdown. Is there some way to get a call BEFORE producer shutdown
>> > occurs?
>> > > Mike
>> > >
>> > >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>> >
>> >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>>
>>
>
>

Reply via email to