Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree that a 
more holistic solution, which tuned itself to total disk availability, might be 
quite useful :)

If we took the min.dirty.bytes route, and defaulted it to the segment size, 
that would work well for distributions where the dirty-map (compaction buffer) 
will be filled by a single dirty segment, but this would depend a bit on the 
message size. If messages were large the dirty-map might not fill, which would 
reduce the yield from the scan. In fact there seems a general incentive to 
defer scanning to ensure the dirty-map always fills. For this reason, the ratio 
approach still seems a little more general to me as it applies equally to large 
and small partitions. 

Let me know if I’m missing something here. 

B


> On 19 May 2016, at 06:29, Gwen Shapira <g...@confluent.io> wrote:
> 
> Oops :)
> 
> The docs are definitely not doing the feature any favors, but I didn't mean
> to imply the feature is thoughtless.
> 
> Here's the thing I'm not getting: You are trading off disk space for IO
> efficiency. Thats reasonable. But why not allow users to specify space in
> bytes?
> 
> Basically tell the LogCompacter: Once I have X bytes of dirty data (or post
> KIP-58, X bytes of data that needs cleaning), please compact it to the best
> of your ability (which in steady state will be into almost nothing).
> 
> Since we know how big the compaction buffer is and how Kafka uses it, we
> can exactly calculate how much space we are wasting vs. how much IO we are
> going to do per unit of time. The size of a single segment or compaction
> buffer (whichever is bigger) can be a good default value for
> min.dirty.bytes. We can even evaluate and re-evaluate it based on the
> amount of free space on the disk. Heck, we can automate those tunings
> (lower min.dirty.bytes to trigger compaction and free space if we are close
> to running out of space).
> 
> We can do the same capacity planning with percentages but it requires more
> information to know the results, information that can only be acquired
> after you reach steady state.
> 
> It is a bit obvious, so I'm guessing the idea was considered and dismissed.
> I just can't see why.
> If only there were KIPs back then, so I could look at rejected
> alternatives...
> 
> Gwen
> 
> 
> 
> On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote:
> 
>> So in summary we never considered this a mechanism to give the consumer
>> time to consume prior to compaction, just a mechanism to control space
>> wastage. It sort of accidentally gives you that but it's super hard to
>> reason about it as an SLA since it is relative to the log size rather than
>> absolute.
>> 
>> -Jay
>> 
>> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote:
>> 
>>> The sad part is I actually did think pretty hard about how to configure
>>> that stuff so I guess *I* think the config makes sense! Clearly trying to
>>> prevent my being shot :-)
>>> 
>>> I agree the name could be improved and the documentation is quite
>>> spartan--no guidance at all on how to set it or what it trades off. A bit
>>> shameful.
>>> 
>>> The thinking was this. One approach to cleaning would be to just do it
>>> continually with the idea that, hey, you can't take that I/O with
>> you--once
>>> you've budgeted N MB/sec of background I/O for compaction some of the
>> time,
>>> you might as well just use that budget all the time. But this leads to
>>> seemingly silly behavior where you are doing big ass compactions all the
>>> time to free up just a few bytes and we thought it would freak people
>> out.
>>> Plus arguably Kafka usage isn't all in steady state so this wastage would
>>> come out of the budget for other bursty stuff.
>>> 
>>> So when should compaction kick in? Well what are you trading off? The
>>> tradeoff here is how much space to waste on disk versus how much I/O to
>> use
>>> in cleaning. In general we can't say exactly how much space a compaction
>>> will free up--during a phase of all "inserts" compaction may free up no
>>> space at all. You just have to do the compaction and hope for the best.
>> But
>>> in general for most compacted topics they should soon reach a "steady
>>> state" where they aren't growing or growing very slowly, so most writes
>> are
>>> updates (if they keep growing rapidly indefinitely then you are going to
>>> run out of space--so safe to assume they do reach steady state). In this
>>> steady state the ratio of uncompacted log to total log is effectively the
>>> utilization (wasted space percentage). So if you set it to 50% your data
>> is
>>> about half duplicates. By tolerating more uncleaned log you get more bang
>>> for your compaction I/O buck but more space wastage. This seemed like a
>>> reasonable way to think about it because maybe you know your compacted
>> data
>>> size (roughly) so you can reason about whether using, say, twice that
>> space
>>> is okay.
>>> 
>>> Maybe we should just change the name to something about target
>> utilization
>>> even though that isn't strictly true except in steady state?
>>> 
>>> -Jay
>>> 
>>> 
>>> On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io> wrote:
>>> 
>>>> Interesting!
>>>> 
>>>> This needs to be double checked by someone with more experience, but
>>>> reading the code, it looks like "log.cleaner.min.cleanable.ratio"
>>>> controls *just* the second property, and I'm not even convinced about
>>>> that.
>>>> 
>>>> Few facts:
>>>> 
>>>> 1. Each cleaner thread cleans one log at a time. It always goes for
>>>> the log with the largest percentage of non-compacted bytes. If you
>>>> just created a new partition, wrote 1G and switched to a new segment,
>>>> it is very likely that this will be the next log to compact.
>>>> Explaining the behavior Eric and Jay complained about. I expected it
>>>> to be rare.
>>>> 
>>>> 2. If the dirtiest log has less than 50% dirty bytes (or whatever
>>>> min.cleanable is), it will be skipped, knowing that others have even
>>>> lower ditry ratio.
>>>> 
>>>> 3. If we do decide to clean a log, we will clean the whole damn thing,
>>>> leaving only the active segment. Contrary to my expectations, it does
>>>> not leave any dirty byte behind. So *at most* you will have a single
>>>> clean segment. Again, explaining why Jay, James and Eric are unhappy.
>>>> 
>>>> 4. What is does guarantee (kinda? at least I think it tries?) is to
>>>> always clean a large "chunk" of data at once, hopefully minimizing
>>>> churn (cleaning small bits off the same log over and over) and
>>>> minimizing IO. It does have the nice mathematical property of
>>>> guaranteeing double the amount of time between cleanings (except it
>>>> doesn't really, because who knows the size of the compacted region).
>>>> 
>>>> 5. Whoever wrote the docs should be shot :)
>>>> 
>>>> so, in conclusion:
>>>> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading,
>>>> difficult to understand, and IMO doesn't even do what it should do.
>>>> I would like to consider the possibility of
>>>> min.cleanable.dirty.bytes, which should give good control over # of IO
>>>> operations (since the size of compaction buffer is known).
>>>> 
>>>> In the context of this KIP, the interaction with cleanable ratio and
>>>> cleanable bytes will be similar, and it looks like it was already done
>>>> correctly in the PR, so no worries ("the ratio's definition will be
>>>> expanded to become the ratio of "compactable" to compactable plus
>>>> compacted message sizes. Where compactable includes log segments that
>>>> are neither the active segment nor those prohibited from being
>>>> compacted because they contain messages that do not satisfy all the
>>>> new lag constraints"
>>>> 
>>>> I may open a new KIP to handle the cleanable ratio. Please don't let
>>>> my confusion detract from this KIP.
>>>> 
>>>> Gwen
>>>> 
>>>> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io> wrote:
>>>>> Generally, this seems like a sensible proposal to me.
>>>>> 
>>>>> Regarding (1): time and message count seem sensible. I can’t think of
>> a
>>>> specific use case for bytes but it seems like there could be one.
>>>>> 
>>>>> Regarding (2):
>>>>> The setting log.cleaner.min.cleanable.ratio currently seems to have
>> two
>>>> uses. It controls which messages will not be compacted, but it also
>>>> provides a fractional bound on how many logs are cleaned (and hence work
>>>> done) in each round. This new proposal seems aimed at the first use, but
>>>> not the second.
>>>>> 
>>>>> The second case better suits a fractional setting like the one we have
>>>> now. Using a fractional value means the amount of data cleaned scales in
>>>> proportion to the data stored in the log. If we were to replace this
>> with
>>>> an absolute value it would create proportionally more cleaning work as
>> the
>>>> log grew in size.
>>>>> 
>>>>> So, if I understand this correctly, I think there is an argument for
>>>> having both.
>>>>> 
>>>>> 
>>>>>> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> wrote:
>>>>>> 
>>>>>> .... and Spark's implementation is another good reason to allow
>>>> compaction lag.
>>>>>> 
>>>>>> I'm convinced :)
>>>>>> 
>>>>>> We need to decide:
>>>>>> 
>>>>>> 1) Do we need just .ms config, or anything else? consumer lag is
>>>>>> measured (and monitored) in messages, so if we need this feature to
>>>>>> somehow work in tandem with consumer lag monitoring, I think we need
>>>>>> .messages too.
>>>>>> 
>>>>>> 2) Does this new configuration allows us to get rid of cleaner.ratio
>>>> config?
>>>>>> 
>>>>>> Gwen
>>>>>> 
>>>>>> 
>>>>>> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman
>>>>>> <eric.wasser...@gmail.com> wrote:
>>>>>>> James,
>>>>>>> 
>>>>>>> Your pictures do an excellent job of illustrating my point.
>>>>>>> 
>>>>>>> My mention of the additional "10's of minutes to hours" refers to
>> how
>>>> far after the original target checkpoint (T1 in your diagram) on may
>> need
>>>> to go to get to a checkpoint where all partitions of all topics are in
>> the
>>>> uncompacted region of their respective logs. In terms of your diagram:
>> the
>>>> T3 transaction could have been written 10's of minutes to hours after
>> T1 as
>>>> that was how much time it took all readers to get to T1.
>>>>>>> 
>>>>>>>> You would not have to start over from the beginning in order to
>> read
>>>> to T3.
>>>>>>> 
>>>>>>> While I agree this is technically true, in practice it could be very
>>>> onerous to actually do it. For example, we use the Kafka consumer that
>> is
>>>> part of the Spark Streaming library to read table topics. It accepts a
>>>> range of offsets to read for each partition. Say we originally target
>>>> ranges from offset 0 to the offset of T1 for each topic+partition. There
>>>> really is no way to have the library arrive at T1 an then "keep going"
>> to
>>>> T3. What is worse, given Spark's design, if you lost a worker during
>> your
>>>> calculations you would be in a rather sticky position. Spark achieves
>>>> resiliency not by data redundancy but by keeping track of how to
>> reproduce
>>>> the transformations leading to a state. In the face of a lost worker,
>> Spark
>>>> would try to re-read that portion of the data on the lost worker from
>>>> Kafka. However, in the interim compaction may have moved past the
>>>> reproducible checkpoint (T3) rendering the data inconsistent. At best
>> the
>>>> entire calculation would need to start over targeting some later
>>>> transaction checkpoint.
>>>>>>> 
>>>>>>> Needless to say with the proposed feature everything is quite
>> simple.
>>>> As long as we set the compaction lag large enough we can be assured
>> that T1
>>>> will remain in the uncompacted region an thereby be reproducible. Thus
>>>> reading from 0 to the offsets in T1 will be sufficient for the duration
>> of
>>>> the calculation.
>>>>>>> 
>>>>>>> Eric
>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 

Reply via email to