Hey Becket,

I am wondering if we should first vote for the KIP before reviewing the
patch. I have two comments below:

- Should we specify the new sensors as part of interface change in the KIP?
- The KIP proposes to increase estimated compression ratio by 0.05 for each
underestimation and decrement the estimation by 0.005 for each
overestimation. Why are these two values chosen? I think there is some
tradeoff in selecting the value. Can the KIP be more explicit about the
tradeoff and explain how these two values would impact producer's
performance?

Thanks,
Dong


On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin <becket....@gmail.com> wrote:

> I have updated the KIP based on the latest discussion. Please check and let
> me know if there is any further concern.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Actually second thought on this, rate might be better for two reasons:
> > 1. Most of the metrics in the producer we already have are using rate
> > instead of count.
> > 2. If a service is bounced, the count will be reset to 0, but it does not
> > affect rate.
> >
> > I'll make the change.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> >> Hi Dong,
> >>
> >> Yes, there is a sensor in the patch about the split occurrence.
> >>
> >> Currently it is a count instead of rate. In practice, it seems count is
> >> easier to use in this case. But I am open to change.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >>> Hey Becket,
> >>>
> >>> I haven't looked at the patch yet. But since we are going to try the
> >>> split-on-oversize solution, should the KIP also add a sensor that shows
> >>> the
> >>> rate of split per second and the probability of split?
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>>
> >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >>>
> >>> > Just to clarify, the implementation is basically what I mentioned
> above
> >>> > (split/resend + adjusted estimation evolving algorithm) and changing
> >>> the
> >>> > compression ratio estimation to be per topic.
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>> >
> >>> > > I went ahead and have a patch submitted here:
> >>> > > https://github.com/apache/kafka/pull/2638
> >>> > >
> >>> > > Per Joel's suggestion, I changed the compression ratio to be per
> >>> topic as
> >>> > > well. It seems working well. Since there is an important behavior
> >>> change
> >>> > > and a new sensor is added, I'll keep the KIP and update it
> according.
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > Jiangjie (Becket) Qin
> >>> > >
> >>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkosh...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > >> >
> >>> > >> > Lets say we sent the batch over the wire and received a
> >>> > >> > RecordTooLargeException, how do we split it as once we add the
> >>> message
> >>> > >> to
> >>> > >> > the batch we loose the message level granularity. We will have
> to
> >>> > >> > decompress, do deep iteration and split and again compress.
> right?
> >>> > This
> >>> > >> > looks like a performance bottle neck in case of multi topic
> >>> producers
> >>> > >> like
> >>> > >> > mirror maker.
> >>> > >> >
> >>> > >>
> >>> > >> Yes, but these should be outliers if we do estimation on a
> per-topic
> >>> > basis
> >>> > >> and if we target a conservative-enough compression ratio. The
> >>> producer
> >>> > >> should also avoid sending over the wire if it can be made aware of
> >>> the
> >>> > >> max-message size limit on the broker, and split if it determines
> >>> that a
> >>> > >> record exceeds the broker's config. Ideally this should be part of
> >>> topic
> >>> > >> metadata but is not - so it could be off a periodic
> describe-configs
> >>> > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
> >>> > >> Command+line+and+centralized+administrative+operations#KIP-
> >>> > >> 4-Commandlineandcentralizedadministrativeoperations-Describe
> >>> > >> ConfigsRequest>
> >>> > >> (which isn't available yet). This doesn't remove the need to split
> >>> and
> >>> > >> recompress though.
> >>> > >>
> >>> > >>
> >>> > >> > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <
> >>> becket....@gmail.com>
> >>> > >> wrote:
> >>> > >> >
> >>> > >> > > Hey Mayuresh,
> >>> > >> > >
> >>> > >> > > 1) The batch would be split when an RecordTooLargeException is
> >>> > >> received.
> >>> > >> > > 2) Not lower the actual compression ratio, but lower the
> >>> estimated
> >>> > >> > > compression ratio "according to" the Actual Compression
> >>> Ratio(ACR).
> >>> > >> > >
> >>> > >> > > An example, let's start with Estimated Compression Ratio
> (ECR) =
> >>> > 1.0.
> >>> > >> Say
> >>> > >> > > the compression ratio of ACR is ~0.8, instead of letting the
> ECR
> >>> > >> dropped
> >>> > >> > to
> >>> > >> > > 0.8 very quickly, we only drop 0.001 every time when ACR <
> ECR.
> >>> > >> However,
> >>> > >> > > once we see an ACR > ECR, we increment ECR by 0.05. If a
> >>> > >> > > RecordTooLargeException is received, we reset the ECR back to
> >>> 1.0
> >>> > and
> >>> > >> > split
> >>> > >> > > the batch.
> >>> > >> > >
> >>> > >> > > Thanks,
> >>> > >> > >
> >>> > >> > > Jiangjie (Becket) Qin
> >>> > >> > >
> >>> > >> > >
> >>> > >> > >
> >>> > >> > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
> >>> > >> > > gharatmayures...@gmail.com> wrote:
> >>> > >> > >
> >>> > >> > > > Hi Becket,
> >>> > >> > > >
> >>> > >> > > > Seems like an interesting idea.
> >>> > >> > > > I had couple of questions :
> >>> > >> > > > 1) How do we decide when the batch should be split?
> >>> > >> > > > 2) What do you mean by slowly lowering the "actual"
> >>> compression
> >>> > >> ratio?
> >>> > >> > > > An example would really help here.
> >>> > >> > > >
> >>> > >> > > > Thanks,
> >>> > >> > > >
> >>> > >> > > > Mayuresh
> >>> > >> > > >
> >>> > >> > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <
> >>> becket....@gmail.com
> >>> > >
> >>> > >> > > wrote:
> >>> > >> > > >
> >>> > >> > > > > Hi Jay,
> >>> > >> > > > >
> >>> > >> > > > > Yeah, I got your point.
> >>> > >> > > > >
> >>> > >> > > > > I think there might be a solution which do not require
> >>> adding a
> >>> > >> new
> >>> > >> > > > > configuration. We can start from a very conservative
> >>> compression
> >>> > >> > ratio
> >>> > >> > > > say
> >>> > >> > > > > 1.0 and lower it very slowly according to the actual
> >>> compression
> >>> > >> > ratio
> >>> > >> > > > > until we hit a point that we have to split a batch. At
> that
> >>> > >> point, we
> >>> > >> > > > > exponentially back off on the compression ratio. The idea
> is
> >>> > >> somewhat
> >>> > >> > > > like
> >>> > >> > > > > TCP. This should help avoid frequent split.
> >>> > >> > > > >
> >>> > >> > > > > The upper bound of the batch size is also a little awkward
> >>> today
> >>> > >> > > because
> >>> > >> > > > we
> >>> > >> > > > > say the batch size is based on compressed size, but users
> >>> cannot
> >>> > >> set
> >>> > >> > it
> >>> > >> > > > to
> >>> > >> > > > > the max message size because that will result in oversized
> >>> > >> messages.
> >>> > >> > > With
> >>> > >> > > > > this change we will be able to allow the users to set the
> >>> > message
> >>> > >> > size
> >>> > >> > > to
> >>> > >> > > > > close to max message size.
> >>> > >> > > > >
> >>> > >> > > > > However the downside is that there could be latency spikes
> >>> in
> >>> > the
> >>> > >> > > system
> >>> > >> > > > in
> >>> > >> > > > > this case due to the splitting, especially when there are
> >>> many
> >>> > >> > messages
> >>> > >> > > > > need to be split at the same time. That could potentially
> >>> be an
> >>> > >> issue
> >>> > >> > > for
> >>> > >> > > > > some users.
> >>> > >> > > > >
> >>> > >> > > > > What do you think about this approach?
> >>> > >> > > > >
> >>> > >> > > > > Thanks,
> >>> > >> > > > >
> >>> > >> > > > > Jiangjie (Becket) Qin
> >>> > >> > > > >
> >>> > >> > > > >
> >>> > >> > > > >
> >>> > >> > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <
> >>> j...@confluent.io>
> >>> > >> wrote:
> >>> > >> > > > >
> >>> > >> > > > > > Hey Becket,
> >>> > >> > > > > >
> >>> > >> > > > > > Yeah that makes sense.
> >>> > >> > > > > >
> >>> > >> > > > > > I agree that you'd really have to both fix the
> estimation
> >>> > (i.e.
> >>> > >> > make
> >>> > >> > > it
> >>> > >> > > > > per
> >>> > >> > > > > > topic or make it better estimate the high percentiles)
> AND
> >>> > have
> >>> > >> the
> >>> > >> > > > > > recovery mechanism. If you are underestimating often and
> >>> then
> >>> > >> > paying
> >>> > >> > > a
> >>> > >> > > > > high
> >>> > >> > > > > > recovery price that won't fly.
> >>> > >> > > > > >
> >>> > >> > > > > > I think you take my main point though, which is just
> that
> >>> I
> >>> > >> hate to
> >>> > >> > > > > exposes
> >>> > >> > > > > > these super low level options to users because it is so
> >>> hard
> >>> > to
> >>> > >> > > explain
> >>> > >> > > > > to
> >>> > >> > > > > > people what it means and how they should set it. So if
> it
> >>> is
> >>> > >> > possible
> >>> > >> > > > to
> >>> > >> > > > > > make either some combination of better estimation and
> >>> > splitting
> >>> > >> or
> >>> > >> > > > better
> >>> > >> > > > > > tolerance of overage that would be preferrable.
> >>> > >> > > > > >
> >>> > >> > > > > > -Jay
> >>> > >> > > > > >
> >>> > >> > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <
> >>> > >> becket....@gmail.com
> >>> > >> > >
> >>> > >> > > > > wrote:
> >>> > >> > > > > >
> >>> > >> > > > > > > @Dong,
> >>> > >> > > > > > >
> >>> > >> > > > > > > Thanks for the comments. The default behavior of the
> >>> > producer
> >>> > >> > won't
> >>> > >> > > > > > change.
> >>> > >> > > > > > > If the users want to use the uncompressed message
> size,
> >>> they
> >>> > >> > > probably
> >>> > >> > > > > > will
> >>> > >> > > > > > > also bump up the batch size to somewhere close to the
> >>> max
> >>> > >> message
> >>> > >> > > > size.
> >>> > >> > > > > > > This would be in the document. BTW the default batch
> >>> size is
> >>> > >> 16K
> >>> > >> > > > which
> >>> > >> > > > > is
> >>> > >> > > > > > > pretty small.
> >>> > >> > > > > > >
> >>> > >> > > > > > > @Jay,
> >>> > >> > > > > > >
> >>> > >> > > > > > > Yeah, we actually had debated quite a bit internally
> >>> what is
> >>> > >> the
> >>> > >> > > best
> >>> > >> > > > > > > solution to this.
> >>> > >> > > > > > >
> >>> > >> > > > > > > I completely agree it is a bug. In practice we usually
> >>> leave
> >>> > >> some
> >>> > >> > > > > > headroom
> >>> > >> > > > > > > to allow the compressed size to grow a little if the
> the
> >>> > >> original
> >>> > >> > > > > > messages
> >>> > >> > > > > > > are not compressible, for example, 1000 KB instead of
> >>> > exactly
> >>> > >> 1
> >>> > >> > MB.
> >>> > >> > > > It
> >>> > >> > > > > is
> >>> > >> > > > > > > likely safe enough.
> >>> > >> > > > > > >
> >>> > >> > > > > > > The major concern for the rejected alternative is
> >>> > >> performance. It
> >>> > >> > > > > largely
> >>> > >> > > > > > > depends on how frequent we need to split a batch, i.e.
> >>> how
> >>> > >> likely
> >>> > >> > > the
> >>> > >> > > > > > > estimation can go off. If we only need to the split
> work
> >>> > >> > > > occasionally,
> >>> > >> > > > > > the
> >>> > >> > > > > > > cost would be amortized so we don't need to worry
> about
> >>> it
> >>> > too
> >>> > >> > > much.
> >>> > >> > > > > > > However, it looks that for a producer with shared
> >>> topics,
> >>> > the
> >>> > >> > > > > estimation
> >>> > >> > > > > > is
> >>> > >> > > > > > > always off. As an example, consider two topics, one
> with
> >>> > >> > > compression
> >>> > >> > > > > > ratio
> >>> > >> > > > > > > 0.6 the other 0.2, assuming exactly same traffic, the
> >>> > average
> >>> > >> > > > > compression
> >>> > >> > > > > > > ratio would be roughly 0.4, which is not right for
> >>> either of
> >>> > >> the
> >>> > >> > > > > topics.
> >>> > >> > > > > > So
> >>> > >> > > > > > > almost half of the batches (of the topics with 0.6
> >>> > compression
> >>> > >> > > ratio)
> >>> > >> > > > > > will
> >>> > >> > > > > > > end up larger than the configured batch size. When it
> >>> comes
> >>> > to
> >>> > >> > more
> >>> > >> > > > > > topics
> >>> > >> > > > > > > such as mirror maker, this becomes more unpredictable.
> >>> To
> >>> > >> avoid
> >>> > >> > > > > frequent
> >>> > >> > > > > > > rejection / split of the batches, we need to
> configured
> >>> the
> >>> > >> batch
> >>> > >> > > > size
> >>> > >> > > > > > > pretty conservatively. This could actually hurt the
> >>> > >> performance
> >>> > >> > > > because
> >>> > >> > > > > > we
> >>> > >> > > > > > > are shoehorn the messages that are highly compressible
> >>> to a
> >>> > >> small
> >>> > >> > > > batch
> >>> > >> > > > > > so
> >>> > >> > > > > > > that the other topics that are not that compressible
> >>> will
> >>> > not
> >>> > >> > > become
> >>> > >> > > > > too
> >>> > >> > > > > > > large with the same batch size. At LinkedIn, our batch
> >>> size
> >>> > is
> >>> > >> > > > > configured
> >>> > >> > > > > > > to 64 KB because of this. I think we may actually have
> >>> > better
> >>> > >> > > > batching
> >>> > >> > > > > if
> >>> > >> > > > > > > we just use the uncompressed message size and 800 KB
> >>> batch
> >>> > >> size.
> >>> > >> > > > > > >
> >>> > >> > > > > > > We did not think about loosening the message size
> >>> > restriction,
> >>> > >> > but
> >>> > >> > > > that
> >>> > >> > > > > > > sounds a viable solution given that the consumer now
> can
> >>> > fetch
> >>> > >> > > > > oversized
> >>> > >> > > > > > > messages. One concern would be that on the broker side
> >>> > >> oversized
> >>> > >> > > > > messages
> >>> > >> > > > > > > will bring more memory pressure. With KIP-92, we may
> >>> > mitigate
> >>> > >> > that,
> >>> > >> > > > but
> >>> > >> > > > > > the
> >>> > >> > > > > > > memory allocation for large messages may not be very
> GC
> >>> > >> > friendly. I
> >>> > >> > > > > need
> >>> > >> > > > > > to
> >>> > >> > > > > > > think about this a little more.
> >>> > >> > > > > > >
> >>> > >> > > > > > > Thanks,
> >>> > >> > > > > > >
> >>> > >> > > > > > > Jiangjie (Becket) Qin
> >>> > >> > > > > > >
> >>> > >> > > > > > >
> >>> > >> > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <
> >>> > j...@confluent.io>
> >>> > >> > > wrote:
> >>> > >> > > > > > >
> >>> > >> > > > > > > > Hey Becket,
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > I get the problem we want to solve with this, but I
> >>> don't
> >>> > >> think
> >>> > >> > > > this
> >>> > >> > > > > is
> >>> > >> > > > > > > > something that makes sense as a user controlled knob
> >>> that
> >>> > >> > > everyone
> >>> > >> > > > > > > sending
> >>> > >> > > > > > > > data to kafka has to think about. It is basically a
> >>> bug,
> >>> > >> right?
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > First, as a technical question is it true that using
> >>> the
> >>> > >> > > > uncompressed
> >>> > >> > > > > > > size
> >>> > >> > > > > > > > for batching actually guarantees that you observe
> the
> >>> > >> limit? I
> >>> > >> > > > think
> >>> > >> > > > > > that
> >>> > >> > > > > > > > implies that compression always makes the messages
> >>> > smaller,
> >>> > >> > > which i
> >>> > >> > > > > > think
> >>> > >> > > > > > > > usually true but is not guaranteed, right? e.g. if
> >>> someone
> >>> > >> > > encrypts
> >>> > >> > > > > > their
> >>> > >> > > > > > > > data which tends to randomize it and then enables
> >>> > >> > compressesion,
> >>> > >> > > it
> >>> > >> > > > > > could
> >>> > >> > > > > > > > slightly get bigger?
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > I also wonder if the rejected alternatives you
> >>> describe
> >>> > >> > couldn't
> >>> > >> > > be
> >>> > >> > > > > > made
> >>> > >> > > > > > > to
> >>> > >> > > > > > > > work: basically try to be a bit better at estimation
> >>> and
> >>> > >> > recover
> >>> > >> > > > when
> >>> > >> > > > > > we
> >>> > >> > > > > > > > guess wrong. I don't think the memory usage should
> be
> >>> a
> >>> > >> > problem:
> >>> > >> > > > > isn't
> >>> > >> > > > > > it
> >>> > >> > > > > > > > the same memory usage the consumer of that topic
> would
> >>> > need?
> >>> > >> > And
> >>> > >> > > > > can't
> >>> > >> > > > > > > you
> >>> > >> > > > > > > > do the splitting and recompression in a streaming
> >>> fashion?
> >>> > >> If
> >>> > >> > we
> >>> > >> > > an
> >>> > >> > > > > > make
> >>> > >> > > > > > > > the estimation rate low and the recovery cost is
> just
> >>> ~2x
> >>> > >> the
> >>> > >> > > > normal
> >>> > >> > > > > > cost
> >>> > >> > > > > > > > for that batch that should be totally fine, right?
> >>> (It's
> >>> > >> > > > technically
> >>> > >> > > > > > true
> >>> > >> > > > > > > > you might have to split more than once, but since
> you
> >>> > halve
> >>> > >> it
> >>> > >> > > each
> >>> > >> > > > > > time
> >>> > >> > > > > > > I
> >>> > >> > > > > > > > think should you get a number of halvings that is
> >>> > >> logarithmic
> >>> > >> > in
> >>> > >> > > > the
> >>> > >> > > > > > miss
> >>> > >> > > > > > > > size, which, with better estimation you'd hope would
> >>> be
> >>> > >> super
> >>> > >> > > duper
> >>> > >> > > > > > > small).
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > Alternatively maybe we could work on the other side
> >>> of the
> >>> > >> > > problem
> >>> > >> > > > > and
> >>> > >> > > > > > > try
> >>> > >> > > > > > > > to make it so that a small miss on message size
> isn't
> >>> a
> >>> > big
> >>> > >> > > > problem.
> >>> > >> > > > > I
> >>> > >> > > > > > > > think original issue was that max size and fetch
> size
> >>> were
> >>> > >> > > tightly
> >>> > >> > > > > > > coupled
> >>> > >> > > > > > > > and the way memory in the consumer worked you really
> >>> > wanted
> >>> > >> > fetch
> >>> > >> > > > > size
> >>> > >> > > > > > to
> >>> > >> > > > > > > > be as small as possible because you'd use that much
> >>> memory
> >>> > >> per
> >>> > >> > > > > fetched
> >>> > >> > > > > > > > partition and the consumer would get stuck if its
> >>> fetch
> >>> > size
> >>> > >> > > wasn't
> >>> > >> > > > > big
> >>> > >> > > > > > > > enough. I think we made some progress on that issue
> >>> and
> >>> > >> maybe
> >>> > >> > > more
> >>> > >> > > > > > could
> >>> > >> > > > > > > be
> >>> > >> > > > > > > > done there so that a small bit of fuzziness around
> the
> >>> > size
> >>> > >> > would
> >>> > >> > > > not
> >>> > >> > > > > > be
> >>> > >> > > > > > > an
> >>> > >> > > > > > > > issue?
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > -Jay
> >>> > >> > > > > > > >
> >>> > >> > > > > > > >
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin <
> >>> > >> > > becket....@gmail.com
> >>> > >> > > > >
> >>> > >> > > > > > > wrote:
> >>> > >> > > > > > > >
> >>> > >> > > > > > > > > Hi folks,
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > > > I would like to start the discussion thread on
> >>> KIP-126.
> >>> > >> The
> >>> > >> > KIP
> >>> > >> > > > > > propose
> >>> > >> > > > > > > > > adding a new configuration to KafkaProducer to
> allow
> >>> > >> batching
> >>> > >> > > > based
> >>> > >> > > > > > on
> >>> > >> > > > > > > > > uncompressed message size.
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > > > Comments are welcome.
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > > > The KIP wiki is following:
> >>> > >> > > > > > > > > https://cwiki.apache.org/confl
> >>> uence/display/KAFKA/KIP-
> >>> > >> > > > > > > > > 126+-+Allow+KafkaProducer+to+b
> >>> > >> atch+based+on+uncompressed+siz
> >>> > >> > e
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > > > Thanks,
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > > > Jiangjie (Becket) Qin
> >>> > >> > > > > > > > >
> >>> > >> > > > > > > >
> >>> > >> > > > > > >
> >>> > >> > > > > >
> >>> > >> > > > >
> >>> > >> > > >
> >>> > >> > > >
> >>> > >> > > >
> >>> > >> > > > --
> >>> > >> > > > -Regards,
> >>> > >> > > > Mayuresh R. Gharat
> >>> > >> > > > (862) 250-7125
> >>> > >> > > >
> >>> > >> > >
> >>> > >> >
> >>> > >> >
> >>> > >> >
> >>> > >> > --
> >>> > >> > -Regards,
> >>> > >> > Mayuresh R. Gharat
> >>> > >> > (862) 250-7125
> >>> > >> >
> >>> > >>
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Reply via email to