No, you are right that mapping dirty-bytes to dirty-map sizes is
non-trivial. I think it would be good to discuss an alternative approach,
but this is probably the wrong thread :)

On Thu, May 19, 2016 at 4:36 AM, Ben Stopford <b...@confluent.io> wrote:

> Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree
> that a more holistic solution, which tuned itself to total disk
> availability, might be quite useful :)
>
> If we took the min.dirty.bytes route, and defaulted it to the segment
> size, that would work well for distributions where the dirty-map
> (compaction buffer) will be filled by a single dirty segment, but this
> would depend a bit on the message size. If messages were large the
> dirty-map might not fill, which would reduce the yield from the scan. In
> fact there seems a general incentive to defer scanning to ensure the
> dirty-map always fills. For this reason, the ratio approach still seems a
> little more general to me as it applies equally to large and small
> partitions.
>
> Let me know if I’m missing something here.
>
> B
>
>
> > On 19 May 2016, at 06:29, Gwen Shapira <g...@confluent.io> wrote:
> >
> > Oops :)
> >
> > The docs are definitely not doing the feature any favors, but I didn't
> mean
> > to imply the feature is thoughtless.
> >
> > Here's the thing I'm not getting: You are trading off disk space for IO
> > efficiency. Thats reasonable. But why not allow users to specify space in
> > bytes?
> >
> > Basically tell the LogCompacter: Once I have X bytes of dirty data (or
> post
> > KIP-58, X bytes of data that needs cleaning), please compact it to the
> best
> > of your ability (which in steady state will be into almost nothing).
> >
> > Since we know how big the compaction buffer is and how Kafka uses it, we
> > can exactly calculate how much space we are wasting vs. how much IO we
> are
> > going to do per unit of time. The size of a single segment or compaction
> > buffer (whichever is bigger) can be a good default value for
> > min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> > amount of free space on the disk. Heck, we can automate those tunings
> > (lower min.dirty.bytes to trigger compaction and free space if we are
> close
> > to running out of space).
> >
> > We can do the same capacity planning with percentages but it requires
> more
> > information to know the results, information that can only be acquired
> > after you reach steady state.
> >
> > It is a bit obvious, so I'm guessing the idea was considered and
> dismissed.
> > I just can't see why.
> > If only there were KIPs back then, so I could look at rejected
> > alternatives...
> >
> > Gwen
> >
> >
> >
> > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> >> So in summary we never considered this a mechanism to give the consumer
> >> time to consume prior to compaction, just a mechanism to control space
> >> wastage. It sort of accidentally gives you that but it's super hard to
> >> reason about it as an SLA since it is relative to the log size rather
> than
> >> absolute.
> >>
> >> -Jay
> >>
> >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote:
> >>
> >>> The sad part is I actually did think pretty hard about how to configure
> >>> that stuff so I guess *I* think the config makes sense! Clearly trying
> to
> >>> prevent my being shot :-)
> >>>
> >>> I agree the name could be improved and the documentation is quite
> >>> spartan--no guidance at all on how to set it or what it trades off. A
> bit
> >>> shameful.
> >>>
> >>> The thinking was this. One approach to cleaning would be to just do it
> >>> continually with the idea that, hey, you can't take that I/O with
> >> you--once
> >>> you've budgeted N MB/sec of background I/O for compaction some of the
> >> time,
> >>> you might as well just use that budget all the time. But this leads to
> >>> seemingly silly behavior where you are doing big ass compactions all
> the
> >>> time to free up just a few bytes and we thought it would freak people
> >> out.
> >>> Plus arguably Kafka usage isn't all in steady state so this wastage
> would
> >>> come out of the budget for other bursty stuff.
> >>>
> >>> So when should compaction kick in? Well what are you trading off? The
> >>> tradeoff here is how much space to waste on disk versus how much I/O to
> >> use
> >>> in cleaning. In general we can't say exactly how much space a
> compaction
> >>> will free up--during a phase of all "inserts" compaction may free up no
> >>> space at all. You just have to do the compaction and hope for the best.
> >> But
> >>> in general for most compacted topics they should soon reach a "steady
> >>> state" where they aren't growing or growing very slowly, so most writes
> >> are
> >>> updates (if they keep growing rapidly indefinitely then you are going
> to
> >>> run out of space--so safe to assume they do reach steady state). In
> this
> >>> steady state the ratio of uncompacted log to total log is effectively
> the
> >>> utilization (wasted space percentage). So if you set it to 50% your
> data
> >> is
> >>> about half duplicates. By tolerating more uncleaned log you get more
> bang
> >>> for your compaction I/O buck but more space wastage. This seemed like a
> >>> reasonable way to think about it because maybe you know your compacted
> >> data
> >>> size (roughly) so you can reason about whether using, say, twice that
> >> space
> >>> is okay.
> >>>
> >>> Maybe we should just change the name to something about target
> >> utilization
> >>> even though that isn't strictly true except in steady state?
> >>>
> >>> -Jay
> >>>
> >>>
> >>> On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io>
> wrote:
> >>>
> >>>> Interesting!
> >>>>
> >>>> This needs to be double checked by someone with more experience, but
> >>>> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
> >>>> controls *just* the second property, and I'm not even convinced about
> >>>> that.
> >>>>
> >>>> Few facts:
> >>>>
> >>>> 1. Each cleaner thread cleans one log at a time. It always goes for
> >>>> the log with the largest percentage of non-compacted bytes. If you
> >>>> just created a new partition, wrote 1G and switched to a new segment,
> >>>> it is very likely that this will be the next log to compact.
> >>>> Explaining the behavior Eric and Jay complained about. I expected it
> >>>> to be rare.
> >>>>
> >>>> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
> >>>> min.cleanable is), it will be skipped, knowing that others have even
> >>>> lower ditry ratio.
> >>>>
> >>>> 3. If we do decide to clean a log, we will clean the whole damn thing,
> >>>> leaving only the active segment. Contrary to my expectations, it does
> >>>> not leave any dirty byte behind. So *at most* you will have a single
> >>>> clean segment. Again, explaining why Jay, James and Eric are unhappy.
> >>>>
> >>>> 4. What is does guarantee (kinda? at least I think it tries?) is to
> >>>> always clean a large "chunk" of data at once, hopefully minimizing
> >>>> churn (cleaning small bits off the same log over and over) and
> >>>> minimizing IO. It does have the nice mathematical property of
> >>>> guaranteeing double the amount of time between cleanings (except it
> >>>> doesn't really, because who knows the size of the compacted region).
> >>>>
> >>>> 5. Whoever wrote the docs should be shot :)
> >>>>
> >>>> so, in conclusion:
> >>>> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
> >>>> difficult to understand, and IMO doesn't even do what it should do.
> >>>> I would like to consider the possibility of
> >>>> min.cleanable.dirty.bytes, which should give good control over # of IO
> >>>> operations (since the size of compaction buffer is known).
> >>>>
> >>>> In the context of this KIP, the interaction with cleanable ratio and
> >>>> cleanable bytes will be similar, and it looks like it was already done
> >>>> correctly in the PR, so no worries ("the ratio's definition will be
> >>>> expanded to become the ratio of "compactable" to compactable plus
> >>>> compacted message sizes. Where compactable includes log segments that
> >>>> are neither the active segment nor those prohibited from being
> >>>> compacted because they contain messages that do not satisfy all the
> >>>> new lag constraints"
> >>>>
> >>>> I may open a new KIP to handle the cleanable ratio. Please don't let
> >>>> my confusion detract from this KIP.
> >>>>
> >>>> Gwen
> >>>>
> >>>> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io>
> wrote:
> >>>>> Generally, this seems like a sensible proposal to me.
> >>>>>
> >>>>> Regarding (1): time and message count seem sensible. I can’t think of
> >> a
> >>>> specific use case for bytes but it seems like there could be one.
> >>>>>
> >>>>> Regarding (2):
> >>>>> The setting log.cleaner.min.cleanable.ratio currently seems to have
> >> two
> >>>> uses. It controls which messages will not be compacted, but it also
> >>>> provides a fractional bound on how many logs are cleaned (and hence
> work
> >>>> done) in each round. This new proposal seems aimed at the first use,
> but
> >>>> not the second.
> >>>>>
> >>>>> The second case better suits a fractional setting like the one we
> have
> >>>> now. Using a fractional value means the amount of data cleaned scales
> in
> >>>> proportion to the data stored in the log. If we were to replace this
> >> with
> >>>> an absolute value it would create proportionally more cleaning work as
> >> the
> >>>> log grew in size.
> >>>>>
> >>>>> So, if I understand this correctly, I think there is an argument for
> >>>> having both.
> >>>>>
> >>>>>
> >>>>>> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> wrote:
> >>>>>>
> >>>>>> .... and Spark's implementation is another good reason to allow
> >>>> compaction lag.
> >>>>>>
> >>>>>> I'm convinced :)
> >>>>>>
> >>>>>> We need to decide:
> >>>>>>
> >>>>>> 1) Do we need just .ms config, or anything else? consumer lag is
> >>>>>> measured (and monitored) in messages, so if we need this feature to
> >>>>>> somehow work in tandem with consumer lag monitoring, I think we need
> >>>>>> .messages too.
> >>>>>>
> >>>>>> 2) Does this new configuration allows us to get rid of cleaner.ratio
> >>>> config?
> >>>>>>
> >>>>>> Gwen
> >>>>>>
> >>>>>>
> >>>>>> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
> >>>>>> <eric.wasser...@gmail.com> wrote:
> >>>>>>> James,
> >>>>>>>
> >>>>>>> Your pictures do an excellent job of illustrating my point.
> >>>>>>>
> >>>>>>> My mention of the additional "10's of minutes to hours" refers to
> >> how
> >>>> far after the original target checkpoint (T1 in your diagram) on may
> >> need
> >>>> to go to get to a checkpoint where all partitions of all topics are in
> >> the
> >>>> uncompacted region of their respective logs. In terms of your diagram:
> >> the
> >>>> T3 transaction could have been written 10's of minutes to hours after
> >> T1 as
> >>>> that was how much time it took all readers to get to T1.
> >>>>>>>
> >>>>>>>> You would not have to start over from the beginning in order to
> >> read
> >>>> to T3.
> >>>>>>>
> >>>>>>> While I agree this is technically true, in practice it could be
> very
> >>>> onerous to actually do it. For example, we use the Kafka consumer that
> >> is
> >>>> part of the Spark Streaming library to read table topics. It accepts a
> >>>> range of offsets to read for each partition. Say we originally target
> >>>> ranges from offset 0 to the offset of T1 for each topic+partition.
> There
> >>>> really is no way to have the library arrive at T1 an then "keep going"
> >> to
> >>>> T3. What is worse, given Spark's design, if you lost a worker during
> >> your
> >>>> calculations you would be in a rather sticky position. Spark achieves
> >>>> resiliency not by data redundancy but by keeping track of how to
> >> reproduce
> >>>> the transformations leading to a state. In the face of a lost worker,
> >> Spark
> >>>> would try to re-read that portion of the data on the lost worker from
> >>>> Kafka. However, in the interim compaction may have moved past the
> >>>> reproducible checkpoint (T3) rendering the data inconsistent. At best
> >> the
> >>>> entire calculation would need to start over targeting some later
> >>>> transaction checkpoint.
> >>>>>>>
> >>>>>>> Needless to say with the proposed feature everything is quite
> >> simple.
> >>>> As long as we set the compaction lag large enough we can be assured
> >> that T1
> >>>> will remain in the uncompacted region an thereby be reproducible. Thus
> >>>> reading from 0 to the offsets in T1 will be sufficient for the
> duration
> >> of
> >>>> the calculation.
> >>>>>>>
> >>>>>>> Eric
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
>
>

Reply via email to