Tom, Documentation improvements are always welcome. The docs are in /docs under the main repository, just sent a PR for trunk and we are good :)
Segment sizes - I have some objections, but this can be discussed in its own thread. I feel like I did enough hijacking and Eric may get annoyed at some point. Gwen On Fri, May 20, 2016 at 5:19 AM, Tom Crayford <tcrayf...@heroku.com> wrote: > Hi, > > From our perspective (running thousands of Kafka clusters), the main issues > we see with compacted topics *aren't* disk space usage, or IO utilization > of the log cleaner. > > Size matters a *lot* to the usability of consumers bootstrapping from the > beginning - in fact we've been debating tuning out the log segment size for > compacted topics to 100MB, because right now leaving 1GB of uncompacted log > makes some bootstrapping take way too long (especially for non JVM clients, > even in fast languages like Go they're not as capable of high throughput as > the JVM clients). I'm wondering if that should be a default in Kafka itself > as well, and would be happy to contribute that kind of change upstream. > Kafka already tunes the __consumer_offsets topic down to 100MB per segment > for this exact reason. > > Secondly, the docs don't make it clear (and this has confused dozens of > well intentioned, smart folk that we've talked to, and likely thousands of > Kafka users across the board) that compaction is an *alternative* to time > based retention. Lots of folk used compaction assuming "it's like time > based retention, but with even less space usage". Switching between the two > is thankfully easy, but it's been a very confusing thing to understand. I'd > like to contribute back clearer docs to Kafka about this. Should I send a > PR? Would that be welcome? > > Thirdly, most users *don't* want to tune Kafka's settings at all, or even > know how or when they should. Whilst some amount of tuning is inevitable, > the drive Gwen has towards "less tuning" is very positive from our > perspective. Most users of most software (including technical users of data > storage and messaging systems) want to "just use it" and not worry about > "do I need to monitor a thousand things and then tune another thousand > based on my metrics". Whilst some of that is unavoidable (for sure), it > feels like compaction tuning should be something the project provides > *great* general purpose defaults for most users, which cover most of the > cases, which leave tuning just to the 1% of folk who really really care. > The current defaults seem to be doing well here (barring the above note > about log compaction size), and any future changes here should keep this > up. > > Thanks > > Tom Crayford > Heroku Kafka > > On Fri, May 20, 2016 at 4:48 AM, Jay Kreps <j...@confluent.io> wrote: > > > Hey Gwen, > > > > Yeah specifying in bytes versus the utilization percent would have been > > easier to implement. The argument against that is that basically users > are > > super terrible at predicting and updating data sizes as stuff grows and > > you'd have to really set this then for each individual log perhaps? > > Currently I think that the utilization number of 50% is pretty reasonable > > for most people and you only need to tune it if you really want to > > optimize. But if you set a fixed size compaction threshold in bytes then > > how aggressive this is and the resulting utilization totally depends on > the > > compacted size of the data in the topic. i.e. if it defaults to 20GB then > > that becomes the minimum size of the log, so if you end up with a bunch > of > > topics with 100mb of compacted data they all end up growing to 20GB. As a > > user if you think you've written 100*100mb worth of compacted partitions > > but Kafka has 100*20GB of data I think you'd be a bit shocked. > > > > Ben--I think your proposal attempts to minimize total I/O by waiting > until > > the compaction buffer will be maxed out. Each unique key in the > uncompacted > > log uses 24 bytes of compaction buffer iirc but since you don't know the > > number of unique keys it's a bit hard to guess this. You could assume > they > > are all unique and only compact when you have N/24 messages in the > > uncompacted log where N is the compaction buffer size in bytes. The issue > > as with Gwen's proposal is that by doing this you really lose control of > > disk utilization which might be a bit unintuitive. Your idea of just > using > > the free disk space might fix this though it might be somewhat complex in > > the mixed setting with both compacted and non-compacted topics. > > > > One other thing worth noting is that compaction isn't just for disk > space. > > A consumer that bootstraps from the beginning (a la state restore in > Kafka > > Streams) has to fully read and process the whole log so I think you want > to > > compact even when you still have free space. > > > > -Jay > > > > > > > > On Wed, May 18, 2016 at 10:29 PM, Gwen Shapira <g...@confluent.io> > wrote: > > > > > Oops :) > > > > > > The docs are definitely not doing the feature any favors, but I didn't > > mean > > > to imply the feature is thoughtless. > > > > > > Here's the thing I'm not getting: You are trading off disk space for IO > > > efficiency. Thats reasonable. But why not allow users to specify space > in > > > bytes? > > > > > > Basically tell the LogCompacter: Once I have X bytes of dirty data (or > > post > > > KIP-58, X bytes of data that needs cleaning), please compact it to the > > best > > > of your ability (which in steady state will be into almost nothing). > > > > > > Since we know how big the compaction buffer is and how Kafka uses it, > we > > > can exactly calculate how much space we are wasting vs. how much IO we > > are > > > going to do per unit of time. The size of a single segment or > compaction > > > buffer (whichever is bigger) can be a good default value for > > > min.dirty.bytes. We can even evaluate and re-evaluate it based on the > > > amount of free space on the disk. Heck, we can automate those tunings > > > (lower min.dirty.bytes to trigger compaction and free space if we are > > close > > > to running out of space). > > > > > > We can do the same capacity planning with percentages but it requires > > more > > > information to know the results, information that can only be acquired > > > after you reach steady state. > > > > > > It is a bit obvious, so I'm guessing the idea was considered and > > dismissed. > > > I just can't see why. > > > If only there were KIPs back then, so I could look at rejected > > > alternatives... > > > > > > Gwen > > > > > > > > > > > > On Wed, May 18, 2016 at 9:54 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > So in summary we never considered this a mechanism to give the > consumer > > > > time to consume prior to compaction, just a mechanism to control > space > > > > wastage. It sort of accidentally gives you that but it's super hard > to > > > > reason about it as an SLA since it is relative to the log size rather > > > than > > > > absolute. > > > > > > > > -Jay > > > > > > > > On Wed, May 18, 2016 at 9:50 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > The sad part is I actually did think pretty hard about how to > > configure > > > > > that stuff so I guess *I* think the config makes sense! Clearly > > trying > > > to > > > > > prevent my being shot :-) > > > > > > > > > > I agree the name could be improved and the documentation is quite > > > > > spartan--no guidance at all on how to set it or what it trades > off. A > > > bit > > > > > shameful. > > > > > > > > > > The thinking was this. One approach to cleaning would be to just do > > it > > > > > continually with the idea that, hey, you can't take that I/O with > > > > you--once > > > > > you've budgeted N MB/sec of background I/O for compaction some of > the > > > > time, > > > > > you might as well just use that budget all the time. But this leads > > to > > > > > seemingly silly behavior where you are doing big ass compactions > all > > > the > > > > > time to free up just a few bytes and we thought it would freak > people > > > > out. > > > > > Plus arguably Kafka usage isn't all in steady state so this wastage > > > would > > > > > come out of the budget for other bursty stuff. > > > > > > > > > > So when should compaction kick in? Well what are you trading off? > > The > > > > > tradeoff here is how much space to waste on disk versus how much > I/O > > to > > > > use > > > > > in cleaning. In general we can't say exactly how much space a > > > compaction > > > > > will free up--during a phase of all "inserts" compaction may free > up > > no > > > > > space at all. You just have to do the compaction and hope for the > > best. > > > > But > > > > > in general for most compacted topics they should soon reach a > "steady > > > > > state" where they aren't growing or growing very slowly, so most > > writes > > > > are > > > > > updates (if they keep growing rapidly indefinitely then you are > going > > > to > > > > > run out of space--so safe to assume they do reach steady state). In > > > this > > > > > steady state the ratio of uncompacted log to total log is > effectively > > > the > > > > > utilization (wasted space percentage). So if you set it to 50% your > > > data > > > > is > > > > > about half duplicates. By tolerating more uncleaned log you get > more > > > bang > > > > > for your compaction I/O buck but more space wastage. This seemed > > like a > > > > > reasonable way to think about it because maybe you know your > > compacted > > > > data > > > > > size (roughly) so you can reason about whether using, say, twice > that > > > > space > > > > > is okay. > > > > > > > > > > Maybe we should just change the name to something about target > > > > utilization > > > > > even though that isn't strictly true except in steady state? > > > > > > > > > > -Jay > > > > > > > > > > > > > > > On Wed, May 18, 2016 at 7:59 PM, Gwen Shapira <g...@confluent.io> > > > wrote: > > > > > > > > > >> Interesting! > > > > >> > > > > >> This needs to be double checked by someone with more experience, > but > > > > >> reading the code, it looks like "log.cleaner.min.cleanable.ratio" > > > > >> controls *just* the second property, and I'm not even convinced > > about > > > > >> that. > > > > >> > > > > >> Few facts: > > > > >> > > > > >> 1. Each cleaner thread cleans one log at a time. It always goes > for > > > > >> the log with the largest percentage of non-compacted bytes. If you > > > > >> just created a new partition, wrote 1G and switched to a new > > segment, > > > > >> it is very likely that this will be the next log to compact. > > > > >> Explaining the behavior Eric and Jay complained about. I expected > it > > > > >> to be rare. > > > > >> > > > > >> 2. If the dirtiest log has less than 50% dirty bytes (or whatever > > > > >> min.cleanable is), it will be skipped, knowing that others have > even > > > > >> lower ditry ratio. > > > > >> > > > > >> 3. If we do decide to clean a log, we will clean the whole damn > > thing, > > > > >> leaving only the active segment. Contrary to my expectations, it > > does > > > > >> not leave any dirty byte behind. So *at most* you will have a > single > > > > >> clean segment. Again, explaining why Jay, James and Eric are > > unhappy. > > > > >> > > > > >> 4. What is does guarantee (kinda? at least I think it tries?) is > to > > > > >> always clean a large "chunk" of data at once, hopefully minimizing > > > > >> churn (cleaning small bits off the same log over and over) and > > > > >> minimizing IO. It does have the nice mathematical property of > > > > >> guaranteeing double the amount of time between cleanings (except > it > > > > >> doesn't really, because who knows the size of the compacted > region). > > > > >> > > > > >> 5. Whoever wrote the docs should be shot :) > > > > >> > > > > >> so, in conclusion: > > > > >> In my mind, min.cleanable.dirty.ratio is terrible, it is > misleading, > > > > >> difficult to understand, and IMO doesn't even do what it should > do. > > > > >> I would like to consider the possibility of > > > > >> min.cleanable.dirty.bytes, which should give good control over # > of > > IO > > > > >> operations (since the size of compaction buffer is known). > > > > >> > > > > >> In the context of this KIP, the interaction with cleanable ratio > and > > > > >> cleanable bytes will be similar, and it looks like it was already > > done > > > > >> correctly in the PR, so no worries ("the ratio's definition will > be > > > > >> expanded to become the ratio of "compactable" to compactable plus > > > > >> compacted message sizes. Where compactable includes log segments > > that > > > > >> are neither the active segment nor those prohibited from being > > > > >> compacted because they contain messages that do not satisfy all > the > > > > >> new lag constraints" > > > > >> > > > > >> I may open a new KIP to handle the cleanable ratio. Please don't > let > > > > >> my confusion detract from this KIP. > > > > >> > > > > >> Gwen > > > > >> > > > > >> On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io> > > > wrote: > > > > >> > Generally, this seems like a sensible proposal to me. > > > > >> > > > > > >> > Regarding (1): time and message count seem sensible. I can’t > think > > > of > > > > a > > > > >> specific use case for bytes but it seems like there could be one. > > > > >> > > > > > >> > Regarding (2): > > > > >> > The setting log.cleaner.min.cleanable.ratio currently seems to > > have > > > > two > > > > >> uses. It controls which messages will not be compacted, but it > also > > > > >> provides a fractional bound on how many logs are cleaned (and > hence > > > work > > > > >> done) in each round. This new proposal seems aimed at the first > use, > > > but > > > > >> not the second. > > > > >> > > > > > >> > The second case better suits a fractional setting like the one > we > > > have > > > > >> now. Using a fractional value means the amount of data cleaned > > scales > > > in > > > > >> proportion to the data stored in the log. If we were to replace > this > > > > with > > > > >> an absolute value it would create proportionally more cleaning > work > > as > > > > the > > > > >> log grew in size. > > > > >> > > > > > >> > So, if I understand this correctly, I think there is an argument > > for > > > > >> having both. > > > > >> > > > > > >> > > > > > >> >> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> > > wrote: > > > > >> >> > > > > >> >> .... and Spark's implementation is another good reason to allow > > > > >> compaction lag. > > > > >> >> > > > > >> >> I'm convinced :) > > > > >> >> > > > > >> >> We need to decide: > > > > >> >> > > > > >> >> 1) Do we need just .ms config, or anything else? consumer lag > is > > > > >> >> measured (and monitored) in messages, so if we need this > feature > > to > > > > >> >> somehow work in tandem with consumer lag monitoring, I think we > > > need > > > > >> >> .messages too. > > > > >> >> > > > > >> >> 2) Does this new configuration allows us to get rid of > > > cleaner.ratio > > > > >> config? > > > > >> >> > > > > >> >> Gwen > > > > >> >> > > > > >> >> > > > > >> >> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman > > > > >> >> <eric.wasser...@gmail.com> wrote: > > > > >> >>> James, > > > > >> >>> > > > > >> >>> Your pictures do an excellent job of illustrating my point. > > > > >> >>> > > > > >> >>> My mention of the additional "10's of minutes to hours" refers > > to > > > > how > > > > >> far after the original target checkpoint (T1 in your diagram) on > may > > > > need > > > > >> to go to get to a checkpoint where all partitions of all topics > are > > in > > > > the > > > > >> uncompacted region of their respective logs. In terms of your > > diagram: > > > > the > > > > >> T3 transaction could have been written 10's of minutes to hours > > after > > > > T1 as > > > > >> that was how much time it took all readers to get to T1. > > > > >> >>> > > > > >> >>>> You would not have to start over from the beginning in order > to > > > > read > > > > >> to T3. > > > > >> >>> > > > > >> >>> While I agree this is technically true, in practice it could > be > > > very > > > > >> onerous to actually do it. For example, we use the Kafka consumer > > that > > > > is > > > > >> part of the Spark Streaming library to read table topics. It > > accepts a > > > > >> range of offsets to read for each partition. Say we originally > > target > > > > >> ranges from offset 0 to the offset of T1 for each topic+partition. > > > There > > > > >> really is no way to have the library arrive at T1 an then "keep > > going" > > > > to > > > > >> T3. What is worse, given Spark's design, if you lost a worker > during > > > > your > > > > >> calculations you would be in a rather sticky position. Spark > > achieves > > > > >> resiliency not by data redundancy but by keeping track of how to > > > > reproduce > > > > >> the transformations leading to a state. In the face of a lost > > worker, > > > > Spark > > > > >> would try to re-read that portion of the data on the lost worker > > from > > > > >> Kafka. However, in the interim compaction may have moved past the > > > > >> reproducible checkpoint (T3) rendering the data inconsistent. At > > best > > > > the > > > > >> entire calculation would need to start over targeting some later > > > > >> transaction checkpoint. > > > > >> >>> > > > > >> >>> Needless to say with the proposed feature everything is quite > > > > simple. > > > > >> As long as we set the compaction lag large enough we can be > assured > > > > that T1 > > > > >> will remain in the uncompacted region an thereby be reproducible. > > Thus > > > > >> reading from 0 to the offsets in T1 will be sufficient for the > > > duration > > > > of > > > > >> the calculation. > > > > >> >>> > > > > >> >>> Eric > > > > >> >>> > > > > >> >>> > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >