No, you are right that mapping dirty-bytes to dirty-map sizes is non-trivial. I think it would be good to discuss an alternative approach, but this is probably the wrong thread :)
On Thu, May 19, 2016 at 4:36 AM, Ben Stopford <b...@confluent.io> wrote: > Hmm. Suffice to say, this isn’t an easy thing to tune, so I would agree > that a more holistic solution, which tuned itself to total disk > availability, might be quite useful :) > > If we took the min.dirty.bytes route, and defaulted it to the segment > size, that would work well for distributions where the dirty-map > (compaction buffer) will be filled by a single dirty segment, but this > would depend a bit on the message size. If messages were large the > dirty-map might not fill, which would reduce the yield from the scan. In > fact there seems a general incentive to defer scanning to ensure the > dirty-map always fills. For this reason, the ratio approach still seems a > little more general to me as it applies equally to large and small > partitions. > > Let me know if I’m missing something here. > > B > > > > On 19 May 2016, at 06:29, Gwen Shapira <g...@confluent.io> wrote: > > > > Oops :) > > > > The docs are definitely not doing the feature any favors, but I didn't > mean > > to imply the feature is thoughtless. > > > > Here's the thing I'm not getting: You are trading off disk space for IO > > efficiency. Thats reasonable. But why not allow users to specify space in > > bytes? > > > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or > post > > KIP-58, X bytes of data that needs cleaning), please compact it to the > best > > of your ability (which in steady state will be into almost nothing). > > > > Since we know how big the compaction buffer is and how Kafka uses it, we > > can exactly calculate how much space we are wasting vs. how much IO we > are > > going to do per unit of time. The size of a single segment or compaction > > buffer (whichever is bigger) can be a good default value for > > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > > amount of free space on the disk. Heck, we can automate those tunings > > (lower min.dirty.bytes to trigger compaction and free space if we are > close > > to running out of space). > > > > We can do the same capacity planning with percentages but it requires > more > > information to know the results, information that can only be acquired > > after you reach steady state. > > > > It is a bit obvious, so I'm guessing the idea was considered and > dismissed. > > I just can't see why. > > If only there were KIPs back then, so I could look at rejected > > alternatives... > > > > Gwen > > > > > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote: > > > >> So in summary we never considered this a mechanism to give the consumer > >> time to consume prior to compaction, just a mechanism to control space > >> wastage. It sort of accidentally gives you that but it's super hard to > >> reason about it as an SLA since it is relative to the log size rather > than > >> absolute. > >> > >> -Jay > >> > >> On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote: > >> > >>> The sad part is I actually did think pretty hard about how to configure > >>> that stuff so I guess *I* think the config makes sense! Clearly trying > to > >>> prevent my being shot :-) > >>> > >>> I agree the name could be improved and the documentation is quite > >>> spartan--no guidance at all on how to set it or what it trades off. A > bit > >>> shameful. > >>> > >>> The thinking was this. One approach to cleaning would be to just do it > >>> continually with the idea that, hey, you can't take that I/O with > >> you--once > >>> you've budgeted N MB/sec of background I/O for compaction some of the > >> time, > >>> you might as well just use that budget all the time. But this leads to > >>> seemingly silly behavior where you are doing big ass compactions all > the > >>> time to free up just a few bytes and we thought it would freak people > >> out. > >>> Plus arguably Kafka usage isn't all in steady state so this wastage > would > >>> come out of the budget for other bursty stuff. > >>> > >>> So when should compaction kick in? Well what are you trading off? The > >>> tradeoff here is how much space to waste on disk versus how much I/O to > >> use > >>> in cleaning. In general we can't say exactly how much space a > compaction > >>> will free up--during a phase of all "inserts" compaction may free up no > >>> space at all. You just have to do the compaction and hope for the best. > >> But > >>> in general for most compacted topics they should soon reach a "steady > >>> state" where they aren't growing or growing very slowly, so most writes > >> are > >>> updates (if they keep growing rapidly indefinitely then you are going > to > >>> run out of space--so safe to assume they do reach steady state). In > this > >>> steady state the ratio of uncompacted log to total log is effectively > the > >>> utilization (wasted space percentage). So if you set it to 50% your > data > >> is > >>> about half duplicates. By tolerating more uncleaned log you get more > bang > >>> for your compaction I/O buck but more space wastage. This seemed like a > >>> reasonable way to think about it because maybe you know your compacted > >> data > >>> size (roughly) so you can reason about whether using, say, twice that > >> space > >>> is okay. > >>> > >>> Maybe we should just change the name to something about target > >> utilization > >>> even though that isn't strictly true except in steady state? > >>> > >>> -Jay > >>> > >>> > >>> On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io> > wrote: > >>> > >>>> Interesting! > >>>> > >>>> This needs to be double checked by someone with more experience, but > >>>> reading the code, it looks like "log.cleaner.min.cleanable.ratio" > >>>> controls *just* the second property, and I'm not even convinced about > >>>> that. > >>>> > >>>> Few facts: > >>>> > >>>> 1. Each cleaner thread cleans one log at a time. It always goes for > >>>> the log with the largest percentage of non-compacted bytes. If you > >>>> just created a new partition, wrote 1G and switched to a new segment, > >>>> it is very likely that this will be the next log to compact. > >>>> Explaining the behavior Eric and Jay complained about. I expected it > >>>> to be rare. > >>>> > >>>> 2. If the dirtiest log has less than 50% dirty bytes (or whatever > >>>> min.cleanable is), it will be skipped, knowing that others have even > >>>> lower ditry ratio. > >>>> > >>>> 3. If we do decide to clean a log, we will clean the whole damn thing, > >>>> leaving only the active segment. Contrary to my expectations, it does > >>>> not leave any dirty byte behind. So *at most* you will have a single > >>>> clean segment. Again, explaining why Jay, James and Eric are unhappy. > >>>> > >>>> 4. What is does guarantee (kinda? at least I think it tries?) is to > >>>> always clean a large "chunk" of data at once, hopefully minimizing > >>>> churn (cleaning small bits off the same log over and over) and > >>>> minimizing IO. It does have the nice mathematical property of > >>>> guaranteeing double the amount of time between cleanings (except it > >>>> doesn't really, because who knows the size of the compacted region). > >>>> > >>>> 5. Whoever wrote the docs should be shot :) > >>>> > >>>> so, in conclusion: > >>>> In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, > >>>> difficult to understand, and IMO doesn't even do what it should do. > >>>> I would like to consider the possibility of > >>>> min.cleanable.dirty.bytes, which should give good control over # of IO > >>>> operations (since the size of compaction buffer is known). > >>>> > >>>> In the context of this KIP, the interaction with cleanable ratio and > >>>> cleanable bytes will be similar, and it looks like it was already done > >>>> correctly in the PR, so no worries ("the ratio's definition will be > >>>> expanded to become the ratio of "compactable" to compactable plus > >>>> compacted message sizes. Where compactable includes log segments that > >>>> are neither the active segment nor those prohibited from being > >>>> compacted because they contain messages that do not satisfy all the > >>>> new lag constraints" > >>>> > >>>> I may open a new KIP to handle the cleanable ratio. Please don't let > >>>> my confusion detract from this KIP. > >>>> > >>>> Gwen > >>>> > >>>> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io> > wrote: > >>>>> Generally, this seems like a sensible proposal to me. > >>>>> > >>>>> Regarding (1): time and message count seem sensible. I can’t think of > >> a > >>>> specific use case for bytes but it seems like there could be one. > >>>>> > >>>>> Regarding (2): > >>>>> The setting log.cleaner.min.cleanable.ratio currently seems to have > >> two > >>>> uses. It controls which messages will not be compacted, but it also > >>>> provides a fractional bound on how many logs are cleaned (and hence > work > >>>> done) in each round. This new proposal seems aimed at the first use, > but > >>>> not the second. > >>>>> > >>>>> The second case better suits a fractional setting like the one we > have > >>>> now. Using a fractional value means the amount of data cleaned scales > in > >>>> proportion to the data stored in the log. If we were to replace this > >> with > >>>> an absolute value it would create proportionally more cleaning work as > >> the > >>>> log grew in size. > >>>>> > >>>>> So, if I understand this correctly, I think there is an argument for > >>>> having both. > >>>>> > >>>>> > >>>>>> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> wrote: > >>>>>> > >>>>>> .... and Spark's implementation is another good reason to allow > >>>> compaction lag. > >>>>>> > >>>>>> I'm convinced :) > >>>>>> > >>>>>> We need to decide: > >>>>>> > >>>>>> 1) Do we need just .ms config, or anything else? consumer lag is > >>>>>> measured (and monitored) in messages, so if we need this feature to > >>>>>> somehow work in tandem with consumer lag monitoring, I think we need > >>>>>> .messages too. > >>>>>> > >>>>>> 2) Does this new configuration allows us to get rid of cleaner.ratio > >>>> config? > >>>>>> > >>>>>> Gwen > >>>>>> > >>>>>> > >>>>>> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman > >>>>>> <eric.wasser...@gmail.com> wrote: > >>>>>>> James, > >>>>>>> > >>>>>>> Your pictures do an excellent job of illustrating my point. > >>>>>>> > >>>>>>> My mention of the additional "10's of minutes to hours" refers to > >> how > >>>> far after the original target checkpoint (T1 in your diagram) on may > >> need > >>>> to go to get to a checkpoint where all partitions of all topics are in > >> the > >>>> uncompacted region of their respective logs. In terms of your diagram: > >> the > >>>> T3 transaction could have been written 10's of minutes to hours after > >> T1 as > >>>> that was how much time it took all readers to get to T1. > >>>>>>> > >>>>>>>> You would not have to start over from the beginning in order to > >> read > >>>> to T3. > >>>>>>> > >>>>>>> While I agree this is technically true, in practice it could be > very > >>>> onerous to actually do it. For example, we use the Kafka consumer that > >> is > >>>> part of the Spark Streaming library to read table topics. It accepts a > >>>> range of offsets to read for each partition. Say we originally target > >>>> ranges from offset 0 to the offset of T1 for each topic+partition. > There > >>>> really is no way to have the library arrive at T1 an then "keep going" > >> to > >>>> T3. What is worse, given Spark's design, if you lost a worker during > >> your > >>>> calculations you would be in a rather sticky position. Spark achieves > >>>> resiliency not by data redundancy but by keeping track of how to > >> reproduce > >>>> the transformations leading to a state. In the face of a lost worker, > >> Spark > >>>> would try to re-read that portion of the data on the lost worker from > >>>> Kafka. However, in the interim compaction may have moved past the > >>>> reproducible checkpoint (T3) rendering the data inconsistent. At best > >> the > >>>> entire calculation would need to start over targeting some later > >>>> transaction checkpoint. > >>>>>>> > >>>>>>> Needless to say with the proposed feature everything is quite > >> simple. > >>>> As long as we set the compaction lag large enough we can be assured > >> that T1 > >>>> will remain in the uncompacted region an thereby be reproducible. Thus > >>>> reading from 0 to the offsets in T1 will be sufficient for the > duration > >> of > >>>> the calculation. > >>>>>>> > >>>>>>> Eric > >>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >>> > >> > >