Interesting! This needs to be double checked by someone with more experience, but reading the code, it looks like "log.cleaner.min.cleanable.ratio" controls *just* the second property, and I'm not even convinced about that.
Few facts: 1. Each cleaner thread cleans one log at a time. It always goes for the log with the largest percentage of non-compacted bytes. If you just created a new partition, wrote 1G and switched to a new segment, it is very likely that this will be the next log to compact. Explaining the behavior Eric and Jay complained about. I expected it to be rare. 2. If the dirtiest log has less than 50% dirty bytes (or whatever min.cleanable is), it will be skipped, knowing that others have even lower ditry ratio. 3. If we do decide to clean a log, we will clean the whole damn thing, leaving only the active segment. Contrary to my expectations, it does not leave any dirty byte behind. So *at most* you will have a single clean segment. Again, explaining why Jay, James and Eric are unhappy. 4. What is does guarantee (kinda? at least I think it tries?) is to always clean a large "chunk" of data at once, hopefully minimizing churn (cleaning small bits off the same log over and over) and minimizing IO. It does have the nice mathematical property of guaranteeing double the amount of time between cleanings (except it doesn't really, because who knows the size of the compacted region). 5. Whoever wrote the docs should be shot :) so, in conclusion: In my mind, min.cleanable.dirty.ratio is terrible, it is misleading, difficult to understand, and IMO doesn't even do what it should do. I would like to consider the possibility of min.cleanable.dirty.bytes, which should give good control over # of IO operations (since the size of compaction buffer is known). In the context of this KIP, the interaction with cleanable ratio and cleanable bytes will be similar, and it looks like it was already done correctly in the PR, so no worries ("the ratio's definition will be expanded to become the ratio of "compactable" to compactable plus compacted message sizes. Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy all the new lag constraints" I may open a new KIP to handle the cleanable ratio. Please don't let my confusion detract from this KIP. Gwen On Wed, May 18, 2016 at 3:41 PM, Ben Stopford <b...@confluent.io> wrote: > Generally, this seems like a sensible proposal to me. > > Regarding (1): time and message count seem sensible. I can’t think of a > specific use case for bytes but it seems like there could be one. > > Regarding (2): > The setting log.cleaner.min.cleanable.ratio currently seems to have two uses. > It controls which messages will not be compacted, but it also provides a > fractional bound on how many logs are cleaned (and hence work done) in each > round. This new proposal seems aimed at the first use, but not the second. > > The second case better suits a fractional setting like the one we have now. > Using a fractional value means the amount of data cleaned scales in > proportion to the data stored in the log. If we were to replace this with an > absolute value it would create proportionally more cleaning work as the log > grew in size. > > So, if I understand this correctly, I think there is an argument for having > both. > > >> On 17 May 2016, at 19:43, Gwen Shapira <g...@confluent.io> wrote: >> >> .... and Spark's implementation is another good reason to allow compaction >> lag. >> >> I'm convinced :) >> >> We need to decide: >> >> 1) Do we need just .ms config, or anything else? consumer lag is >> measured (and monitored) in messages, so if we need this feature to >> somehow work in tandem with consumer lag monitoring, I think we need >> .messages too. >> >> 2) Does this new configuration allows us to get rid of cleaner.ratio config? >> >> Gwen >> >> >> On Tue, May 17, 2016 at 9:43 AM, Eric Wasserman >> <eric.wasser...@gmail.com> wrote: >>> James, >>> >>> Your pictures do an excellent job of illustrating my point. >>> >>> My mention of the additional "10's of minutes to hours" refers to how far >>> after the original target checkpoint (T1 in your diagram) on may need to go >>> to get to a checkpoint where all partitions of all topics are in the >>> uncompacted region of their respective logs. In terms of your diagram: the >>> T3 transaction could have been written 10's of minutes to hours after T1 as >>> that was how much time it took all readers to get to T1. >>> >>>> You would not have to start over from the beginning in order to read to T3. >>> >>> While I agree this is technically true, in practice it could be very >>> onerous to actually do it. For example, we use the Kafka consumer that is >>> part of the Spark Streaming library to read table topics. It accepts a >>> range of offsets to read for each partition. Say we originally target >>> ranges from offset 0 to the offset of T1 for each topic+partition. There >>> really is no way to have the library arrive at T1 an then "keep going" to >>> T3. What is worse, given Spark's design, if you lost a worker during your >>> calculations you would be in a rather sticky position. Spark achieves >>> resiliency not by data redundancy but by keeping track of how to reproduce >>> the transformations leading to a state. In the face of a lost worker, Spark >>> would try to re-read that portion of the data on the lost worker from >>> Kafka. However, in the interim compaction may have moved past the >>> reproducible checkpoint (T3) rendering the data inconsistent. At best the >>> entire calculation would need to start over targeting some later >>> transaction checkpoint. >>> >>> Needless to say with the proposed feature everything is quite simple. As >>> long as we set the compaction lag large enough we can be assured that T1 >>> will remain in the uncompacted region an thereby be reproducible. Thus >>> reading from 0 to the offsets in T1 will be sufficient for the duration of >>> the calculation. >>> >>> Eric >>> >>> >