Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree that a more holistic solution, which tuned itself to total disk availability, might be quite useful :)
If we took the min.dirty.bytes route, and defaulted it to the segment size, that would work well for distributions where the dirty-map (compaction buffer) will be filled by a single dirty segment, but this would depend a bit on the message size. If messages were large the dirty-map might not fill, which would reduce the yield from the scan. In fact there seems a general incentive to defer scanning to ensure the dirty-map always fills. For this reason, the ratio approach still seems a little more general to me as it applies equally to large and small partitions. Let me know if I’m missing something here. B > On 19 May 2016, at 06:29, Gwen Shapira <g...@confluent.io> wrote: > > Oops :) > > The docs are definitely not doing the feature any favors, but I didn't mean > to imply the feature is thoughtless. > > Here's the thing I'm not getting: You are trading off disk space for IO > efficiency. Thats reasonable. But why not allow users to specify space in > bytes? > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or post > KIP-58, X bytes of data that needs cleaning), please compact it to the best > of your ability (which in steady state will be into almost nothing). > > Since we know how big the compaction buffer is and how Kafka uses it, we > can exactly calculate how much space we are wasting vs. how much IO we are > going to do per unit of time. The size of a single segment or compaction > buffer (whichever is bigger) can be a good default value for > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > amount of free space on the disk. Heck, we can automate those tunings > (lower min.dirty.bytes to trigger compaction and free space if we are close > to running out of space). > > We can do the same capacity planning with percentages but it requires more > information to know the results, information that can only be acquired > after you reach steady state. > > It is a bit obvious, so I'm guessing the idea was considered and dismissed. > I just can't see why. > If only there were KIPs back then, so I could look at rejected > alternatives... > > Gwen > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote: > >> So in summary we never considered this a mechanism to give the consumer >> time to consume prior to compaction, just a mechanism to control space >> wastage. It sort of accidentally gives you that but it's super hard to >> reason about it as an SLA since it is relative to the log size rather than >> absolute. >> >> -Jay >> >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote: >> >>> The sad part is I actually did think pretty hard about how to configure >>> that stuff so I guess *I* think the config makes sense! Clearly trying to >>> prevent my being shot :-) >>> >>> I agree the name could be improved and the documentation is quite >>> spartan--no guidance at all on how to set it or what it trades off. A bit >>> shameful. >>> >>> The thinking was this. One approach to cleaning would be to just do it >>> continually with the idea that, hey, you can't take that I/O with >> you--once >>> you've budgeted N MB/sec of background I/O for compaction some of the >> time, >>> you might as well just use that budget all the time. But this leads to >>> seemingly silly behavior where you are doing big ass compactions all the >>> time to free up just a few bytes and we thought it would freak people >> out. >>> Plus arguably Kafka usage isn't all in steady state so this wastage would >>> come out of the budget for other bursty stuff. >>> >>> So when should compaction kick in? Well what are you trading off? The >>> tradeoff here is how much space to waste on disk versus how much I/O to >> use >>> in cleaning. In general we can't say exactly how much space a compaction >>> will free up--during a phase of all "inserts" compaction may free up no >>> space at all. You just have to do the compaction and hope for the best. >> But >>> in general for most compacted topics they should soon reach a "steady >>> state" where they aren't growing or growing very slowly, so most writes >> are >>> updates (if they keep growing rapidly indefinitely then you are going to >>> run out of space--so safe to assume they do reach steady state). In this >>> steady state the ratio of uncompacted log to total log is effectively the >>> utilization (wasted space percentage). So if you set it to 50% your data >> is >>> about half duplicates. By tolerating more uncleaned log you get more bang >>> for your compaction I/O buck but more space wastage. This seemed like a >>> reasonable way to think about it because maybe you know your compacted >> data >>> size (roughly) so you can reason about whether using, say, twice that >> space >>> is okay. >>> >>> Maybe we should just change the name to something about target >> utilization >>> even though that isn't strictly true except in steady state? >>> >>> -Jay >>> >>> >>> On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io> wrote: >>> >>>> Interesting! >>>> >>>> This needs to be double checked by someone with more experience, but >>>> reading the code, it looks like "log.cleaner.min.cleanable.ratio" >>>> controls *just* the second property, and I'm not even convinced about >>>> that. >>>> >>>> Few facts: >>>> >>>> 1. Each cleaner thread cleans one log at a time. It always goes for >>>> the log with the largest percentage of non-compacted bytes. If you >>>> just created a new partition, wrote 1G and switched to a new segment, >>>> it is very likely that this will be the next log to compact. >>>> Explaining the behavior Eric and Jay complained about. I expected it >>>> to be rare. >>>> >>>> 2. If the dirtiest log has less than 50% dirty bytes (or whatever >>>> min.cleanable is), it will be skipped, knowing that others have even >>>> lower ditry ratio. >>>> >>>> 3. If we do decide to clean a log, we will clean the whole damn thing, >>>> leaving only the active segment. Contrary to my expectations, it does >>>> not leave any dirty byte behind. So *at most* you will have a single >>>> clean segment. Again, explaining why Jay, James and Eric are unhappy. >>>> >>>> 4. What is does guarantee (kinda? at least I think it tries?) is to >>>> always clean a large "chunk" of data at once, hopefully minimizing >>>> churn (cleaning small bits off the same log over and over) and >>>> minimizing IO. It does have the nice mathematical property of >>>> guaranteeing double the amount of time between cleanings (except it >>>> doesn't really, because who knows the size of the compacted region). >>>> >>>> 5. Whoever wrote the docs should be shot :) >>>> >>>> so, in conclusion: >>>> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, >>>> difficult to understand, and IMO doesn't even do what it should do. >>>> I would like to consider the possibility of >>>> min.cleanable.dirty.bytes, which should give good control over # of IO >>>> operations (since the size of compaction buffer is known). >>>> >>>> In the context of this KIP, the interaction with cleanable ratio and >>>> cleanable bytes will be similar, and it looks like it was already done >>>> correctly in the PR, so no worries ("the ratio's definition will be >>>> expanded to become the ratio of "compactable" to compactable plus >>>> compacted message sizes. Where compactable includes log segments that >>>> are neither the active segment nor those prohibited from being >>>> compacted because they contain messages that do not satisfy all the >>>> new lag constraints" >>>> >>>> I may open a new KIP to handle the cleanable ratio. Please don't let >>>> my confusion detract from this KIP. >>>> >>>> Gwen >>>> >>>> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io> wrote: >>>>> Generally, this seems like a sensible proposal to me. >>>>> >>>>> Regarding (1): time and message count seem sensible. I can’t think of >> a >>>> specific use case for bytes but it seems like there could be one. >>>>> >>>>> Regarding (2): >>>>> The setting log.cleaner.min.cleanable.ratio currently seems to have >> two >>>> uses. It controls which messages will not be compacted, but it also >>>> provides a fractional bound on how many logs are cleaned (and hence work >>>> done) in each round. This new proposal seems aimed at the first use, but >>>> not the second. >>>>> >>>>> The second case better suits a fractional setting like the one we have >>>> now. Using a fractional value means the amount of data cleaned scales in >>>> proportion to the data stored in the log. If we were to replace this >> with >>>> an absolute value it would create proportionally more cleaning work as >> the >>>> log grew in size. >>>>> >>>>> So, if I understand this correctly, I think there is an argument for >>>> having both. >>>>> >>>>> >>>>>> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> wrote: >>>>>> >>>>>> .... and Spark's implementation is another good reason to allow >>>> compaction lag. >>>>>> >>>>>> I'm convinced :) >>>>>> >>>>>> We need to decide: >>>>>> >>>>>> 1) Do we need just .ms config, or anything else? consumer lag is >>>>>> measured (and monitored) in messages, so if we need this feature to >>>>>> somehow work in tandem with consumer lag monitoring, I think we need >>>>>> .messages too. >>>>>> >>>>>> 2) Does this new configuration allows us to get rid of cleaner.ratio >>>> config? >>>>>> >>>>>> Gwen >>>>>> >>>>>> >>>>>> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman >>>>>> <eric.wasser...@gmail.com> wrote: >>>>>>> James, >>>>>>> >>>>>>> Your pictures do an excellent job of illustrating my point. >>>>>>> >>>>>>> My mention of the additional "10's of minutes to hours" refers to >> how >>>> far after the original target checkpoint (T1 in your diagram) on may >> need >>>> to go to get to a checkpoint where all partitions of all topics are in >> the >>>> uncompacted region of their respective logs. In terms of your diagram: >> the >>>> T3 transaction could have been written 10's of minutes to hours after >> T1 as >>>> that was how much time it took all readers to get to T1. >>>>>>> >>>>>>>> You would not have to start over from the beginning in order to >> read >>>> to T3. >>>>>>> >>>>>>> While I agree this is technically true, in practice it could be very >>>> onerous to actually do it. For example, we use the Kafka consumer that >> is >>>> part of the Spark Streaming library to read table topics. It accepts a >>>> range of offsets to read for each partition. Say we originally target >>>> ranges from offset 0 to the offset of T1 for each topic+partition. There >>>> really is no way to have the library arrive at T1 an then "keep going" >> to >>>> T3. What is worse, given Spark's design, if you lost a worker during >> your >>>> calculations you would be in a rather sticky position. Spark achieves >>>> resiliency not by data redundancy but by keeping track of how to >> reproduce >>>> the transformations leading to a state. In the face of a lost worker, >> Spark >>>> would try to re-read that portion of the data on the lost worker from >>>> Kafka. However, in the interim compaction may have moved past the >>>> reproducible checkpoint (T3) rendering the data inconsistent. At best >> the >>>> entire calculation would need to start over targeting some later >>>> transaction checkpoint. >>>>>>> >>>>>>> Needless to say with the proposed feature everything is quite >> simple. >>>> As long as we set the compaction lag large enough we can be assured >> that T1 >>>> will remain in the uncompacted region an thereby be reproducible. Thus >>>> reading from 0 to the offsets in T1 will be sufficient for the duration >> of >>>> the calculation. >>>>>>> >>>>>>> Eric >>>>>>> >>>>>>> >>>>> >>>> >>> >>> >>