Bump up. I am going to initiate the vote If there is no further concern with the KIP.
On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Mayuresh, > > Thanks for the comment. If the message's offset is below low_watermark, > then it should have been deleted by log retention policy. Thus it is OK not > to expose this message to consumer. Does this answer your question? > > Thanks, > Dong > > On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > >> Hi Dong, >> >> Thanks for the KIP. >> >> I had a question (which might have been answered before). >> >> 1) The KIP says that the low_water_mark will be updated periodically by >> the >> broker like high_water_mark. >> Essentially we want to use low_water_mark for cases where an entire >> segment >> cannot be deleted because may be the segment_start_offset < PurgeOffset < >> segment_end_offset, in which case we will set the low_water_mark to >> PurgeOffset+1. >> >> 2) The KIP also says that messages below low_water_mark will not be >> exposed >> for consumers, which does make sense since we want say that data below >> low_water_mark is purged. >> >> Looking at above conditions, does it make sense not to update the >> low_water_mark periodically but only on PurgeRequest? >> The reason being, if we update it periodically then as per 2) we will not >> be allowing consumers to re-consume data that is not purged but is below >> low_water_mark. >> >> Thanks, >> >> Mayuresh >> >> >> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Thanks for reviewing the KIP! >> > >> > 1. The low_watermark will be checkpointed in a new file named >> > "replication-low-watermark-checkpoint". It will have the same format >> as >> > the existing replication-offset-checkpoint file. This allows us the keep >> > the existing format of checkpoint files which maps TopicPartition to >> Long. >> > I just updated the "Public Interface" section in the KIP wiki to explain >> > this file. >> > >> > 2. I think using low_watermark from leader to trigger log retention in >> the >> > follower will work correctly in the sense that all messages with offset >> < >> > low_watermark can be deleted. But I am not sure that the efficiency is >> the >> > same, i.e. offset of messages which should be deleted (i.e. due to time >> or >> > size-based log retention policy) will be smaller than low_watermark from >> > the leader. >> > >> > For example, say both the follower and the leader have messages with >> > offsets in range [0, 2000]. If the follower does log rolling slightly >> later >> > than leader, the segments on follower would be [0, 1001], [1002, 2000] >> and >> > segments on leader would be [0, 1000], [1001, 2000]. After leader >> deletes >> > the first segment, the low_watermark would be 1001. Thus the first >> segment >> > would stay on follower's disk unnecessarily which may double disk usage >> at >> > worst. >> > >> > Since this approach doesn't save us much, I am inclined to not include >> this >> > change to keep the KIP simple. >> > >> > Dong >> > >> > >> > >> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Thanks for the proposal. Looks good overall. A couple of comments. >> > > >> > > 1. Where is the low_watermark checkpointed? Is that >> > > in replication-offset-checkpoint? If so, do we need to bump up the >> > version? >> > > Could you also describe the format change? >> > > >> > > 2. For topics with "delete" retention, currently we let each replica >> > delete >> > > old segments independently. With low_watermark, we could just let >> leaders >> > > delete old segments through the deletion policy and the followers will >> > > simply delete old segments based on low_watermark. Not sure if this >> saves >> > > much, but is a potential option that may be worth thinking about. >> > > >> > > Jun >> > > >> > > >> > > >> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenbl...@gmail.com> >> > wrote: >> > > >> > > > one more example of complicated config - mirror maker. >> > > > >> > > > we definitely cant trust each and every topic owner to configure >> their >> > > > topics not to purge before they've been mirrored. >> > > > which would mean there's a per-topic config (set by the owner) and a >> > > > "global" config (where mirror makers are specified) and they need >> to be >> > > > "merged". >> > > > for those topics that _are_ mirrored. >> > > > which is a changing set of topics thats stored in an external system >> > > > outside of kafka. >> > > > if a topic is taken out of the mirror set the MM offset would be >> > "frozen" >> > > > at that point and prevent clean-up for all eternity, unless its >> > > cleaned-up >> > > > itself. >> > > > >> > > > ... >> > > > >> > > > complexity :-) >> > > > >> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com> >> > > wrote: >> > > > >> > > > > in summary - i'm not opposed to the idea of a per-topic clean up >> > config >> > > > > that tracks some set of consumer groups' offsets (which would >> > probably >> > > > work >> > > > > for 80% of use cases), but i definitely see a need to expose a >> simple >> > > API >> > > > > for the more advanced/obscure/custom use cases (the other 20%). >> > > > > >> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > >> a major motivation for this KIP is cost savings. >> > > > >> >> > > > >> lots of internal systems at LI use kafka as an intermediate pipe, >> > and >> > > > set >> > > > >> the topic retention period to a "safe enough" amount of time to >> be >> > > able >> > > > to >> > > > >> recover from crashes/downtime and catch up to "now". this results >> > in a >> > > > few >> > > > >> days' worth of retention typically. >> > > > >> >> > > > >> however, under normal operating conditions the consumers are >> mostly >> > > > >> caught-up and so early clean-up enables a big cost savings in >> > storage. >> > > > >> >> > > > >> as for my points: >> > > > >> >> > > > >> 1. when discussing implementation options for automatic clean-up >> we >> > > > >> realized that cleaning up by keeping track of offsets stored in >> > kafka >> > > > >> requires some per-topic config - you need to specify which >> groups to >> > > > track. >> > > > >> this becomes a problem because: >> > > > >> 1.1 - relatively complicated code, to be written in the >> broker. >> > > > >> 1.2 - configuration needs to be maintained up to date by >> topic >> > > > >> "owners" - of which we have thousands. failure to do so would >> > decrease >> > > > the >> > > > >> cost benefit. >> > > > >> 1.3 - some applications have a "reconsume" / "reinit" / >> > > "bootstrap" >> > > > >> workflow where they will reset their offsets to an earlier value >> > than >> > > > the >> > > > >> one stored. this means that a stored offset of X does not always >> > mean >> > > > you >> > > > >> can clean up to X-1. think of it as video encoding -some apps >> have >> > > "key >> > > > >> frames" they may seek back to which are before their current >> offset. >> > > > >> 1.4 - there are multiple possible strategies - you could >> clean >> > up >> > > > >> aggressively, retain some "time distance" from latest, some >> "offset >> > > > >> distance", etc. this we think would have made it very hard to >> agree >> > > on a >> > > > >> single "correct" implementation that everyone would be happy >> with. >> > it >> > > > would >> > > > >> be better to include the raw functionality in the API and leave >> the >> > > > >> "brains" to an external monitoring system where people could >> > > > custom-taylor >> > > > >> their logic >> > > > >> >> > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin >> up >> > > > >> console consumers and connect to a topic as a debug aid. SREs may >> > also >> > > > do >> > > > >> this. there are also various other eco-system applications that >> may >> > > > >> consumer from topics (unknown to topic owners as those are infra >> > > > monitoring >> > > > >> tools). obviously such consumer-groups' offsets should be ignored >> > for >> > > > >> purposes of clean-up, but coming up with a bullet-proof way to do >> > this >> > > > is >> > > > >> non-trivial and again ties with implementation complexity and >> > > > inflexibility >> > > > >> of a "one size fits all" solution in 1.4 above. >> > > > >> >> > > > >> 3. forceful clean-up: we have workflows that use kafka to move >> > > gigantic >> > > > >> blobs from offline hadoop processing flows into systems. the data >> > > being >> > > > >> "loaded" into such an online system can be several GBs in side >> and >> > > take >> > > > a >> > > > >> long time to consume (they are sliced into many small msgs). >> > sometimes >> > > > the >> > > > >> sender wants to abort and start a new blob before the current >> load >> > > > process >> > > > >> has completed - meaning the consumer's offsets are not yet caught >> > up. >> > > > >> >> > > > >> 4. offsets outside of kafka: yes, you could force applications to >> > > store >> > > > >> their offsets twice, but thats inefficient. its better to expose >> a >> > > raw, >> > > > >> simple API and let such applications manage their own clean-up >> logic >> > > > (this >> > > > >> again ties into 1.4 and no "one size fits all" solution) >> > > > >> >> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > >> >> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava < >> > > > >>> e...@confluent.io> >> > > > >>> wrote: >> > > > >>> >> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com >> > >> > > > wrote: >> > > > >>> > >> > > > >>> > > Hey Ewen, >> > > > >>> > > >> > > > >>> > > Thanks for the review. As Radai explained, it would be >> complex >> > in >> > > > >>> terms >> > > > >>> > of >> > > > >>> > > user configuration if we were to use committed offset to >> decide >> > > > data >> > > > >>> > > deletion. We need a way to specify which groups need to >> consume >> > > > data >> > > > >>> of >> > > > >>> > > this partition. The broker will also need to consume the >> entire >> > > > >>> offsets >> > > > >>> > > topic in that approach which has some overhead. I don't >> think >> > it >> > > is >> > > > >>> that >> > > > >>> > > hard to implement. But it will likely take more time to >> discuss >> > > > that >> > > > >>> > > approach due to the new config and the server side overhead. >> > > > >>> > > >> > > > >>> > > We choose to put this API in AdminClient because the API is >> > more >> > > > >>> like an >> > > > >>> > > administrative operation (such as listGroups, deleteTopics) >> > than >> > > a >> > > > >>> > consumer >> > > > >>> > > operation. It is not necessarily called by consumer only. >> For >> > > > >>> example, we >> > > > >>> > > can implement the "delete data before committed offset" >> > approach >> > > by >> > > > >>> > running >> > > > >>> > > an external service which calls purgeDataBefore() API based >> on >> > > > >>> committed >> > > > >>> > > offset of consumer groups. >> > > > >>> > > >> > > > >>> > > I am not aware that AdminClient is not a public API. >> Suppose it >> > > is >> > > > >>> not >> > > > >>> > > public now, I assume we plan to make it public in the >> future as >> > > > part >> > > > >>> of >> > > > >>> > > KIP-4. Are we not making it public because its interface is >> not >> > > > >>> stable? >> > > > >>> > If >> > > > >>> > > so, can we just tag this new API as not stable in the code? >> > > > >>> > > >> > > > >>> > >> > > > >>> > >> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based >> > > implementation. >> > > > >>> It's >> > > > >>> > definitely confusing that both will be (could be?) named >> > > AdminClient, >> > > > >>> but >> > > > >>> > we've kept the existing Scala AdminClient out of the public >> API >> > and >> > > > >>> have >> > > > >>> > not required KIPs for changes to it. >> > > > >>> > >> > > > >>> > That said, I agree that if this type of API makes it into >> Kafka, >> > > > >>> having a >> > > > >>> > (new, Java-based) AdminClient method would definitely be a >> good >> > > idea. >> > > > >>> An >> > > > >>> > alternative path might be to have a Consumer-based >> implementation >> > > > since >> > > > >>> > that seems like a very intuitive, natural way to use the >> > protocol. >> > > I >> > > > >>> think >> > > > >>> > optimizing for the expected use case would be a good idea. >> > > > >>> > >> > > > >>> > -Ewen >> > > > >>> > >> > > > >>> > Are you saying that the Scala AdminClient is not a public API >> and >> > > we >> > > > >>> discourage addition of any new feature to this class? >> > > > >>> >> > > > >>> I still prefer to add it to AdminClient (Java version in the >> future >> > > and >> > > > >>> Scala version in the short team) because I feel it belongs to >> admin >> > > > >>> operation instead of KafkaConsumer interface. For example, if in >> > the >> > > > >>> future >> > > > >>> we implement the "delete data before committed offset" strategy >> in >> > an >> > > > >>> external service, I feel it is a bit awkward if the service has >> to >> > > > >>> instantiate a KafkaConsumer and call >> KafkaConsumer.purgeDataBefore( >> > > > ...) >> > > > >>> to >> > > > >>> purge data. In other words, our expected use-case doesn't >> > necessarily >> > > > >>> bind >> > > > >>> this API with consumer. >> > > > >>> >> > > > >>> I am not strong on this issue. Let's see what other >> > > > committers/developers >> > > > >>> think about this. >> > > > >>> >> > > > >>> >> > > > >>> > >> > > > >>> > > >> > > > >>> > > Thanks, >> > > > >>> > > Dong >> > > > >>> > > >> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < >> > > > >>> e...@confluent.io >> > > > >>> > > >> > > > >>> > > wrote: >> > > > >>> > > >> > > > >>> > > > Dong, >> > > > >>> > > > >> > > > >>> > > > Looks like that's an internal link, >> > > > >>> > > > https://cwiki.apache.org/confl >> uence/display/KAFKA/KIP-107% >> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient >> > > > >>> > > > is the right one. >> > > > >>> > > > >> > > > >>> > > > I have a question about one of the rejected alternatives: >> > > > >>> > > > >> > > > >>> > > > > Using committed offset instead of an extra API to >> trigger >> > > data >> > > > >>> purge >> > > > >>> > > > operation. >> > > > >>> > > > >> > > > >>> > > > The KIP says this would be more complicated to implement. >> Why >> > > is >> > > > >>> that? >> > > > >>> > I >> > > > >>> > > > think brokers would have to consume the entire offsets >> topic, >> > > but >> > > > >>> the >> > > > >>> > > data >> > > > >>> > > > stored in memory doesn't seem to change and applying this >> > when >> > > > >>> updated >> > > > >>> > > > offsets are seen seems basically the same. It might also >> be >> > > > >>> possible to >> > > > >>> > > > make it work even with multiple consumer groups if that >> was >> > > > desired >> > > > >>> > > > (although that'd require tracking more data in memory) as >> a >> > > > >>> > > generalization >> > > > >>> > > > without requiring coordination between the consumer >> groups. >> > > Given >> > > > >>> the >> > > > >>> > > > motivation, I'm assuming this was considered unnecessary >> > since >> > > > this >> > > > >>> > > > specifically targets intermediate stream processing >> topics. >> > > > >>> > > > >> > > > >>> > > > Another question is why expose this via AdminClient (which >> > > isn't >> > > > >>> public >> > > > >>> > > API >> > > > >>> > > > afaik)? Why not, for example, expose it on the Consumer, >> > which >> > > is >> > > > >>> > > > presumably where you'd want access to it since the >> > > functionality >> > > > >>> > depends >> > > > >>> > > on >> > > > >>> > > > the consumer actually having consumed the data? >> > > > >>> > > > >> > > > >>> > > > -Ewen >> > > > >>> > > > >> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin < >> > lindon...@gmail.com> >> > > > >>> wrote: >> > > > >>> > > > >> > > > >>> > > > > Hi all, >> > > > >>> > > > > >> > > > >>> > > > > We created KIP-107 to propose addition of >> purgeDataBefore() >> > > API >> > > > >>> in >> > > > >>> > > > > AdminClient. >> > > > >>> > > > > >> > > > >>> > > > > Please find the KIP wiki in the link >> > > > https://iwww.corp.linkedin. >> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka >> +purgeDataBefore%28%29+API+ >> > > > >>> > > > design+proposal. >> > > > >>> > > > > We >> > > > >>> > > > > would love to hear your comments and suggestions. >> > > > >>> > > > > >> > > > >>> > > > > Thanks, >> > > > >>> > > > > Dong >> > > > >>> > > > > >> > > > >>> > > > >> > > > >>> > > >> > > > >>> > >> > > > >>> >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> -Regards, >> Mayuresh R. Gharat >> (862) 250-7125 >> > >