Just to clarify, the implementation is basically what I mentioned above
(split/resend + adjusted estimation evolving algorithm) and changing the
compression ratio estimation to be per topic.

Thanks,

Jiangjie (Becket) Qin

On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <becket....@gmail.com> wrote:

> I went ahead and have a patch submitted here:
> https://github.com/apache/kafka/pull/2638
>
> Per Joel's suggestion, I changed the compression ratio to be per topic as
> well. It seems working well. Since there is an important behavior change
> and a new sensor is added, I'll keep the KIP and update it according.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
>
>> >
>> > Lets say we sent the batch over the wire and received a
>> > RecordTooLargeException, how do we split it as once we add the message
>> to
>> > the batch we loose the message level granularity. We will have to
>> > decompress, do deep iteration and split and again compress. right? This
>> > looks like a performance bottle neck in case of multi topic producers
>> like
>> > mirror maker.
>> >
>>
>> Yes, but these should be outliers if we do estimation on a per-topic basis
>> and if we target a conservative-enough compression ratio. The producer
>> should also avoid sending over the wire if it can be made aware of the
>> max-message size limit on the broker, and split if it determines that a
>> record exceeds the broker's config. Ideally this should be part of topic
>> metadata but is not - so it could be off a periodic describe-configs
>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
>> Command+line+and+centralized+administrative+operations#KIP-
>> 4-Commandlineandcentralizedadministrativeoperations-Describe
>> ConfigsRequest>
>> (which isn't available yet). This doesn't remove the need to split and
>> recompress though.
>>
>>
>> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Hey Mayuresh,
>> > >
>> > > 1) The batch would be split when an RecordTooLargeException is
>> received.
>> > > 2) Not lower the actual compression ratio, but lower the estimated
>> > > compression ratio "according to" the Actual Compression Ratio(ACR).
>> > >
>> > > An example, let's start with Estimated Compression Ratio (ECR) = 1.0.
>> Say
>> > > the compression ratio of ACR is ~0.8, instead of letting the ECR
>> dropped
>> > to
>> > > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR.
>> However,
>> > > once we see an ACR > ECR, we increment ECR by 0.05. If a
>> > > RecordTooLargeException is received, we reset the ECR back to 1.0 and
>> > split
>> > > the batch.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > >
>> > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
>> > > gharatmayures...@gmail.com> wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > Seems like an interesting idea.
>> > > > I had couple of questions :
>> > > > 1) How do we decide when the batch should be split?
>> > > > 2) What do you mean by slowly lowering the "actual" compression
>> ratio?
>> > > > An example would really help here.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Mayuresh
>> > > >
>> > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <becket....@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Jay,
>> > > > >
>> > > > > Yeah, I got your point.
>> > > > >
>> > > > > I think there might be a solution which do not require adding a
>> new
>> > > > > configuration. We can start from a very conservative compression
>> > ratio
>> > > > say
>> > > > > 1.0 and lower it very slowly according to the actual compression
>> > ratio
>> > > > > until we hit a point that we have to split a batch. At that
>> point, we
>> > > > > exponentially back off on the compression ratio. The idea is
>> somewhat
>> > > > like
>> > > > > TCP. This should help avoid frequent split.
>> > > > >
>> > > > > The upper bound of the batch size is also a little awkward today
>> > > because
>> > > > we
>> > > > > say the batch size is based on compressed size, but users cannot
>> set
>> > it
>> > > > to
>> > > > > the max message size because that will result in oversized
>> messages.
>> > > With
>> > > > > this change we will be able to allow the users to set the message
>> > size
>> > > to
>> > > > > close to max message size.
>> > > > >
>> > > > > However the downside is that there could be latency spikes in the
>> > > system
>> > > > in
>> > > > > this case due to the splitting, especially when there are many
>> > messages
>> > > > > need to be split at the same time. That could potentially be an
>> issue
>> > > for
>> > > > > some users.
>> > > > >
>> > > > > What do you think about this approach?
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jiangjie (Becket) Qin
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <j...@confluent.io>
>> wrote:
>> > > > >
>> > > > > > Hey Becket,
>> > > > > >
>> > > > > > Yeah that makes sense.
>> > > > > >
>> > > > > > I agree that you'd really have to both fix the estimation (i.e.
>> > make
>> > > it
>> > > > > per
>> > > > > > topic or make it better estimate the high percentiles) AND have
>> the
>> > > > > > recovery mechanism. If you are underestimating often and then
>> > paying
>> > > a
>> > > > > high
>> > > > > > recovery price that won't fly.
>> > > > > >
>> > > > > > I think you take my main point though, which is just that I
>> hate to
>> > > > > exposes
>> > > > > > these super low level options to users because it is so hard to
>> > > explain
>> > > > > to
>> > > > > > people what it means and how they should set it. So if it is
>> > possible
>> > > > to
>> > > > > > make either some combination of better estimation and splitting
>> or
>> > > > better
>> > > > > > tolerance of overage that would be preferrable.
>> > > > > >
>> > > > > > -Jay
>> > > > > >
>> > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <
>> becket....@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > @Dong,
>> > > > > > >
>> > > > > > > Thanks for the comments. The default behavior of the producer
>> > won't
>> > > > > > change.
>> > > > > > > If the users want to use the uncompressed message size, they
>> > > probably
>> > > > > > will
>> > > > > > > also bump up the batch size to somewhere close to the max
>> message
>> > > > size.
>> > > > > > > This would be in the document. BTW the default batch size is
>> 16K
>> > > > which
>> > > > > is
>> > > > > > > pretty small.
>> > > > > > >
>> > > > > > > @Jay,
>> > > > > > >
>> > > > > > > Yeah, we actually had debated quite a bit internally what is
>> the
>> > > best
>> > > > > > > solution to this.
>> > > > > > >
>> > > > > > > I completely agree it is a bug. In practice we usually leave
>> some
>> > > > > > headroom
>> > > > > > > to allow the compressed size to grow a little if the the
>> original
>> > > > > > messages
>> > > > > > > are not compressible, for example, 1000 KB instead of exactly
>> 1
>> > MB.
>> > > > It
>> > > > > is
>> > > > > > > likely safe enough.
>> > > > > > >
>> > > > > > > The major concern for the rejected alternative is
>> performance. It
>> > > > > largely
>> > > > > > > depends on how frequent we need to split a batch, i.e. how
>> likely
>> > > the
>> > > > > > > estimation can go off. If we only need to the split work
>> > > > occasionally,
>> > > > > > the
>> > > > > > > cost would be amortized so we don't need to worry about it too
>> > > much.
>> > > > > > > However, it looks that for a producer with shared topics, the
>> > > > > estimation
>> > > > > > is
>> > > > > > > always off. As an example, consider two topics, one with
>> > > compression
>> > > > > > ratio
>> > > > > > > 0.6 the other 0.2, assuming exactly same traffic, the average
>> > > > > compression
>> > > > > > > ratio would be roughly 0.4, which is not right for either of
>> the
>> > > > > topics.
>> > > > > > So
>> > > > > > > almost half of the batches (of the topics with 0.6 compression
>> > > ratio)
>> > > > > > will
>> > > > > > > end up larger than the configured batch size. When it comes to
>> > more
>> > > > > > topics
>> > > > > > > such as mirror maker, this becomes more unpredictable. To
>> avoid
>> > > > > frequent
>> > > > > > > rejection / split of the batches, we need to configured the
>> batch
>> > > > size
>> > > > > > > pretty conservatively. This could actually hurt the
>> performance
>> > > > because
>> > > > > > we
>> > > > > > > are shoehorn the messages that are highly compressible to a
>> small
>> > > > batch
>> > > > > > so
>> > > > > > > that the other topics that are not that compressible will not
>> > > become
>> > > > > too
>> > > > > > > large with the same batch size. At LinkedIn, our batch size is
>> > > > > configured
>> > > > > > > to 64 KB because of this. I think we may actually have better
>> > > > batching
>> > > > > if
>> > > > > > > we just use the uncompressed message size and 800 KB batch
>> size.
>> > > > > > >
>> > > > > > > We did not think about loosening the message size restriction,
>> > but
>> > > > that
>> > > > > > > sounds a viable solution given that the consumer now can fetch
>> > > > > oversized
>> > > > > > > messages. One concern would be that on the broker side
>> oversized
>> > > > > messages
>> > > > > > > will bring more memory pressure. With KIP-92, we may mitigate
>> > that,
>> > > > but
>> > > > > > the
>> > > > > > > memory allocation for large messages may not be very GC
>> > friendly. I
>> > > > > need
>> > > > > > to
>> > > > > > > think about this a little more.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Jiangjie (Becket) Qin
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <j...@confluent.io>
>> > > wrote:
>> > > > > > >
>> > > > > > > > Hey Becket,
>> > > > > > > >
>> > > > > > > > I get the problem we want to solve with this, but I don't
>> think
>> > > > this
>> > > > > is
>> > > > > > > > something that makes sense as a user controlled knob that
>> > > everyone
>> > > > > > > sending
>> > > > > > > > data to kafka has to think about. It is basically a bug,
>> right?
>> > > > > > > >
>> > > > > > > > First, as a technical question is it true that using the
>> > > > uncompressed
>> > > > > > > size
>> > > > > > > > for batching actually guarantees that you observe the
>> limit? I
>> > > > think
>> > > > > > that
>> > > > > > > > implies that compression always makes the messages smaller,
>> > > which i
>> > > > > > think
>> > > > > > > > usually true but is not guaranteed, right? e.g. if someone
>> > > encrypts
>> > > > > > their
>> > > > > > > > data which tends to randomize it and then enables
>> > compressesion,
>> > > it
>> > > > > > could
>> > > > > > > > slightly get bigger?
>> > > > > > > >
>> > > > > > > > I also wonder if the rejected alternatives you describe
>> > couldn't
>> > > be
>> > > > > > made
>> > > > > > > to
>> > > > > > > > work: basically try to be a bit better at estimation and
>> > recover
>> > > > when
>> > > > > > we
>> > > > > > > > guess wrong. I don't think the memory usage should be a
>> > problem:
>> > > > > isn't
>> > > > > > it
>> > > > > > > > the same memory usage the consumer of that topic would need?
>> > And
>> > > > > can't
>> > > > > > > you
>> > > > > > > > do the splitting and recompression in a streaming fashion?
>> If
>> > we
>> > > an
>> > > > > > make
>> > > > > > > > the estimation rate low and the recovery cost is just ~2x
>> the
>> > > > normal
>> > > > > > cost
>> > > > > > > > for that batch that should be totally fine, right? (It's
>> > > > technically
>> > > > > > true
>> > > > > > > > you might have to split more than once, but since you halve
>> it
>> > > each
>> > > > > > time
>> > > > > > > I
>> > > > > > > > think should you get a number of halvings that is
>> logarithmic
>> > in
>> > > > the
>> > > > > > miss
>> > > > > > > > size, which, with better estimation you'd hope would be
>> super
>> > > duper
>> > > > > > > small).
>> > > > > > > >
>> > > > > > > > Alternatively maybe we could work on the other side of the
>> > > problem
>> > > > > and
>> > > > > > > try
>> > > > > > > > to make it so that a small miss on message size isn't a big
>> > > > problem.
>> > > > > I
>> > > > > > > > think original issue was that max size and fetch size were
>> > > tightly
>> > > > > > > coupled
>> > > > > > > > and the way memory in the consumer worked you really wanted
>> > fetch
>> > > > > size
>> > > > > > to
>> > > > > > > > be as small as possible because you'd use that much memory
>> per
>> > > > > fetched
>> > > > > > > > partition and the consumer would get stuck if its fetch size
>> > > wasn't
>> > > > > big
>> > > > > > > > enough. I think we made some progress on that issue and
>> maybe
>> > > more
>> > > > > > could
>> > > > > > > be
>> > > > > > > > done there so that a small bit of fuzziness around the size
>> > would
>> > > > not
>> > > > > > be
>> > > > > > > an
>> > > > > > > > issue?
>> > > > > > > >
>> > > > > > > > -Jay
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin <
>> > > becket....@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi folks,
>> > > > > > > > >
>> > > > > > > > > I would like to start the discussion thread on KIP-126.
>> The
>> > KIP
>> > > > > > propose
>> > > > > > > > > adding a new configuration to KafkaProducer to allow
>> batching
>> > > > based
>> > > > > > on
>> > > > > > > > > uncompressed message size.
>> > > > > > > > >
>> > > > > > > > > Comments are welcome.
>> > > > > > > > >
>> > > > > > > > > The KIP wiki is following:
>> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > > > > 126+-+Allow+KafkaProducer+to+b
>> atch+based+on+uncompressed+siz
>> > e
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > >
>> > > > > > > > > Jiangjie (Becket) Qin
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -Regards,
>> > > > Mayuresh R. Gharat
>> > > > (862) 250-7125
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -Regards,
>> > Mayuresh R. Gharat
>> > (862) 250-7125
>> >
>>
>
>

Reply via email to