Hi Mayuresh, low_watermark will be updated when log retention fires on the broker. It may also be updated on the follower when follower receives FetchResponse from leader; and it may be updated on the leader when leader receives PurgeRequest from admin client.
Thanks, Dong On Wed, Jan 11, 2017 at 7:37 AM, Mayuresh Gharat <gharatmayures...@gmail.com > wrote: > Hi Dong, > > As per "If the message's offset is below low_watermark, > then it should have been deleted by log retention policy." > ---> I am not sure if I understand this correctly. Do you mean to say that > the low_watermark will be updated only when the log retention fires on the > broker? > > Thanks, > > Mayuresh > > On Tue, Jan 10, 2017 at 2:56 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Bump up. I am going to initiate the vote If there is no further concern > > with the KIP. > > > > On Fri, Jan 6, 2017 at 11:23 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Mayuresh, > > > > > > Thanks for the comment. If the message's offset is below low_watermark, > > > then it should have been deleted by log retention policy. Thus it is OK > > not > > > to expose this message to consumer. Does this answer your question? > > > > > > Thanks, > > > Dong > > > > > > On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> wrote: > > > > > >> Hi Dong, > > >> > > >> Thanks for the KIP. > > >> > > >> I had a question (which might have been answered before). > > >> > > >> 1) The KIP says that the low_water_mark will be updated periodically > by > > >> the > > >> broker like high_water_mark. > > >> Essentially we want to use low_water_mark for cases where an entire > > >> segment > > >> cannot be deleted because may be the segment_start_offset < > PurgeOffset > > < > > >> segment_end_offset, in which case we will set the low_water_mark to > > >> PurgeOffset+1. > > >> > > >> 2) The KIP also says that messages below low_water_mark will not be > > >> exposed > > >> for consumers, which does make sense since we want say that data below > > >> low_water_mark is purged. > > >> > > >> Looking at above conditions, does it make sense not to update the > > >> low_water_mark periodically but only on PurgeRequest? > > >> The reason being, if we update it periodically then as per 2) we will > > not > > >> be allowing consumers to re-consume data that is not purged but is > below > > >> low_water_mark. > > >> > > >> Thanks, > > >> > > >> Mayuresh > > >> > > >> > > >> On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindon...@gmail.com> > wrote: > > >> > > >> > Hey Jun, > > >> > > > >> > Thanks for reviewing the KIP! > > >> > > > >> > 1. The low_watermark will be checkpointed in a new file named > > >> > "replication-low-watermark-checkpoint". It will have the same > format > > >> as > > >> > the existing replication-offset-checkpoint file. This allows us the > > keep > > >> > the existing format of checkpoint files which maps TopicPartition to > > >> Long. > > >> > I just updated the "Public Interface" section in the KIP wiki to > > explain > > >> > this file. > > >> > > > >> > 2. I think using low_watermark from leader to trigger log retention > in > > >> the > > >> > follower will work correctly in the sense that all messages with > > offset > > >> < > > >> > low_watermark can be deleted. But I am not sure that the efficiency > is > > >> the > > >> > same, i.e. offset of messages which should be deleted (i.e. due to > > time > > >> or > > >> > size-based log retention policy) will be smaller than low_watermark > > from > > >> > the leader. > > >> > > > >> > For example, say both the follower and the leader have messages with > > >> > offsets in range [0, 2000]. If the follower does log rolling > slightly > > >> later > > >> > than leader, the segments on follower would be [0, 1001], [1002, > 2000] > > >> and > > >> > segments on leader would be [0, 1000], [1001, 2000]. After leader > > >> deletes > > >> > the first segment, the low_watermark would be 1001. Thus the first > > >> segment > > >> > would stay on follower's disk unnecessarily which may double disk > > usage > > >> at > > >> > worst. > > >> > > > >> > Since this approach doesn't save us much, I am inclined to not > include > > >> this > > >> > change to keep the KIP simple. > > >> > > > >> > Dong > > >> > > > >> > > > >> > > > >> > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <j...@confluent.io> wrote: > > >> > > > >> > > Hi, Dong, > > >> > > > > >> > > Thanks for the proposal. Looks good overall. A couple of comments. > > >> > > > > >> > > 1. Where is the low_watermark checkpointed? Is that > > >> > > in replication-offset-checkpoint? If so, do we need to bump up the > > >> > version? > > >> > > Could you also describe the format change? > > >> > > > > >> > > 2. For topics with "delete" retention, currently we let each > replica > > >> > delete > > >> > > old segments independently. With low_watermark, we could just let > > >> leaders > > >> > > delete old segments through the deletion policy and the followers > > will > > >> > > simply delete old segments based on low_watermark. Not sure if > this > > >> saves > > >> > > much, but is a potential option that may be worth thinking about. > > >> > > > > >> > > Jun > > >> > > > > >> > > > > >> > > > > >> > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenbl...@gmail.com > > > > >> > wrote: > > >> > > > > >> > > > one more example of complicated config - mirror maker. > > >> > > > > > >> > > > we definitely cant trust each and every topic owner to configure > > >> their > > >> > > > topics not to purge before they've been mirrored. > > >> > > > which would mean there's a per-topic config (set by the owner) > > and a > > >> > > > "global" config (where mirror makers are specified) and they > need > > >> to be > > >> > > > "merged". > > >> > > > for those topics that _are_ mirrored. > > >> > > > which is a changing set of topics thats stored in an external > > system > > >> > > > outside of kafka. > > >> > > > if a topic is taken out of the mirror set the MM offset would be > > >> > "frozen" > > >> > > > at that point and prevent clean-up for all eternity, unless its > > >> > > cleaned-up > > >> > > > itself. > > >> > > > > > >> > > > ... > > >> > > > > > >> > > > complexity :-) > > >> > > > > > >> > > > On Wed, Jan 4, 2017 at 8:04 AM, radai < > radai.rosenbl...@gmail.com > > > > > >> > > wrote: > > >> > > > > > >> > > > > in summary - i'm not opposed to the idea of a per-topic clean > up > > >> > config > > >> > > > > that tracks some set of consumer groups' offsets (which would > > >> > probably > > >> > > > work > > >> > > > > for 80% of use cases), but i definitely see a need to expose a > > >> simple > > >> > > API > > >> > > > > for the more advanced/obscure/custom use cases (the other > 20%). > > >> > > > > > > >> > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai < > > radai.rosenbl...@gmail.com > > >> > > > >> > > > wrote: > > >> > > > > > > >> > > > >> a major motivation for this KIP is cost savings. > > >> > > > >> > > >> > > > >> lots of internal systems at LI use kafka as an intermediate > > pipe, > > >> > and > > >> > > > set > > >> > > > >> the topic retention period to a "safe enough" amount of time > to > > >> be > > >> > > able > > >> > > > to > > >> > > > >> recover from crashes/downtime and catch up to "now". this > > results > > >> > in a > > >> > > > few > > >> > > > >> days' worth of retention typically. > > >> > > > >> > > >> > > > >> however, under normal operating conditions the consumers are > > >> mostly > > >> > > > >> caught-up and so early clean-up enables a big cost savings in > > >> > storage. > > >> > > > >> > > >> > > > >> as for my points: > > >> > > > >> > > >> > > > >> 1. when discussing implementation options for automatic > > clean-up > > >> we > > >> > > > >> realized that cleaning up by keeping track of offsets stored > in > > >> > kafka > > >> > > > >> requires some per-topic config - you need to specify which > > >> groups to > > >> > > > track. > > >> > > > >> this becomes a problem because: > > >> > > > >> 1.1 - relatively complicated code, to be written in the > > >> broker. > > >> > > > >> 1.2 - configuration needs to be maintained up to date by > > >> topic > > >> > > > >> "owners" - of which we have thousands. failure to do so would > > >> > decrease > > >> > > > the > > >> > > > >> cost benefit. > > >> > > > >> 1.3 - some applications have a "reconsume" / "reinit" / > > >> > > "bootstrap" > > >> > > > >> workflow where they will reset their offsets to an earlier > > value > > >> > than > > >> > > > the > > >> > > > >> one stored. this means that a stored offset of X does not > > always > > >> > mean > > >> > > > you > > >> > > > >> can clean up to X-1. think of it as video encoding -some apps > > >> have > > >> > > "key > > >> > > > >> frames" they may seek back to which are before their current > > >> offset. > > >> > > > >> 1.4 - there are multiple possible strategies - you could > > >> clean > > >> > up > > >> > > > >> aggressively, retain some "time distance" from latest, some > > >> "offset > > >> > > > >> distance", etc. this we think would have made it very hard to > > >> agree > > >> > > on a > > >> > > > >> single "correct" implementation that everyone would be happy > > >> with. > > >> > it > > >> > > > would > > >> > > > >> be better to include the raw functionality in the API and > leave > > >> the > > >> > > > >> "brains" to an external monitoring system where people could > > >> > > > custom-taylor > > >> > > > >> their logic > > >> > > > >> > > >> > > > >> 2. ad-hoc consumer groups: its common practice for devs to > spin > > >> up > > >> > > > >> console consumers and connect to a topic as a debug aid. SREs > > may > > >> > also > > >> > > > do > > >> > > > >> this. there are also various other eco-system applications > that > > >> may > > >> > > > >> consumer from topics (unknown to topic owners as those are > > infra > > >> > > > monitoring > > >> > > > >> tools). obviously such consumer-groups' offsets should be > > ignored > > >> > for > > >> > > > >> purposes of clean-up, but coming up with a bullet-proof way > to > > do > > >> > this > > >> > > > is > > >> > > > >> non-trivial and again ties with implementation complexity and > > >> > > > inflexibility > > >> > > > >> of a "one size fits all" solution in 1.4 above. > > >> > > > >> > > >> > > > >> 3. forceful clean-up: we have workflows that use kafka to > move > > >> > > gigantic > > >> > > > >> blobs from offline hadoop processing flows into systems. the > > data > > >> > > being > > >> > > > >> "loaded" into such an online system can be several GBs in > side > > >> and > > >> > > take > > >> > > > a > > >> > > > >> long time to consume (they are sliced into many small msgs). > > >> > sometimes > > >> > > > the > > >> > > > >> sender wants to abort and start a new blob before the current > > >> load > > >> > > > process > > >> > > > >> has completed - meaning the consumer's offsets are not yet > > caught > > >> > up. > > >> > > > >> > > >> > > > >> 4. offsets outside of kafka: yes, you could force > applications > > to > > >> > > store > > >> > > > >> their offsets twice, but thats inefficient. its better to > > expose > > >> a > > >> > > raw, > > >> > > > >> simple API and let such applications manage their own > clean-up > > >> logic > > >> > > > (this > > >> > > > >> again ties into 1.4 and no "one size fits all" solution) > > >> > > > >> > > >> > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin < > lindon...@gmail.com > > > > > >> > > wrote: > > >> > > > >> > > >> > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava < > > >> > > > >>> e...@confluent.io> > > >> > > > >>> wrote: > > >> > > > >>> > > >> > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin < > > lindon...@gmail.com > > >> > > > >> > > > wrote: > > >> > > > >>> > > > >> > > > >>> > > Hey Ewen, > > >> > > > >>> > > > > >> > > > >>> > > Thanks for the review. As Radai explained, it would be > > >> complex > > >> > in > > >> > > > >>> terms > > >> > > > >>> > of > > >> > > > >>> > > user configuration if we were to use committed offset to > > >> decide > > >> > > > data > > >> > > > >>> > > deletion. We need a way to specify which groups need to > > >> consume > > >> > > > data > > >> > > > >>> of > > >> > > > >>> > > this partition. The broker will also need to consume the > > >> entire > > >> > > > >>> offsets > > >> > > > >>> > > topic in that approach which has some overhead. I don't > > >> think > > >> > it > > >> > > is > > >> > > > >>> that > > >> > > > >>> > > hard to implement. But it will likely take more time to > > >> discuss > > >> > > > that > > >> > > > >>> > > approach due to the new config and the server side > > overhead. > > >> > > > >>> > > > > >> > > > >>> > > We choose to put this API in AdminClient because the API > > is > > >> > more > > >> > > > >>> like an > > >> > > > >>> > > administrative operation (such as listGroups, > > deleteTopics) > > >> > than > > >> > > a > > >> > > > >>> > consumer > > >> > > > >>> > > operation. It is not necessarily called by consumer > only. > > >> For > > >> > > > >>> example, we > > >> > > > >>> > > can implement the "delete data before committed offset" > > >> > approach > > >> > > by > > >> > > > >>> > running > > >> > > > >>> > > an external service which calls purgeDataBefore() API > > based > > >> on > > >> > > > >>> committed > > >> > > > >>> > > offset of consumer groups. > > >> > > > >>> > > > > >> > > > >>> > > I am not aware that AdminClient is not a public API. > > >> Suppose it > > >> > > is > > >> > > > >>> not > > >> > > > >>> > > public now, I assume we plan to make it public in the > > >> future as > > >> > > > part > > >> > > > >>> of > > >> > > > >>> > > KIP-4. Are we not making it public because its interface > > is > > >> not > > >> > > > >>> stable? > > >> > > > >>> > If > > >> > > > >>> > > so, can we just tag this new API as not stable in the > > code? > > >> > > > >>> > > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > The AdminClient planned for KIP-4 is a new Java-based > > >> > > implementation. > > >> > > > >>> It's > > >> > > > >>> > definitely confusing that both will be (could be?) named > > >> > > AdminClient, > > >> > > > >>> but > > >> > > > >>> > we've kept the existing Scala AdminClient out of the > public > > >> API > > >> > and > > >> > > > >>> have > > >> > > > >>> > not required KIPs for changes to it. > > >> > > > >>> > > > >> > > > >>> > That said, I agree that if this type of API makes it into > > >> Kafka, > > >> > > > >>> having a > > >> > > > >>> > (new, Java-based) AdminClient method would definitely be a > > >> good > > >> > > idea. > > >> > > > >>> An > > >> > > > >>> > alternative path might be to have a Consumer-based > > >> implementation > > >> > > > since > > >> > > > >>> > that seems like a very intuitive, natural way to use the > > >> > protocol. > > >> > > I > > >> > > > >>> think > > >> > > > >>> > optimizing for the expected use case would be a good idea. > > >> > > > >>> > > > >> > > > >>> > -Ewen > > >> > > > >>> > > > >> > > > >>> > Are you saying that the Scala AdminClient is not a public > > API > > >> and > > >> > > we > > >> > > > >>> discourage addition of any new feature to this class? > > >> > > > >>> > > >> > > > >>> I still prefer to add it to AdminClient (Java version in the > > >> future > > >> > > and > > >> > > > >>> Scala version in the short team) because I feel it belongs > to > > >> admin > > >> > > > >>> operation instead of KafkaConsumer interface. For example, > if > > in > > >> > the > > >> > > > >>> future > > >> > > > >>> we implement the "delete data before committed offset" > > strategy > > >> in > > >> > an > > >> > > > >>> external service, I feel it is a bit awkward if the service > > has > > >> to > > >> > > > >>> instantiate a KafkaConsumer and call > > >> KafkaConsumer.purgeDataBefore( > > >> > > > ...) > > >> > > > >>> to > > >> > > > >>> purge data. In other words, our expected use-case doesn't > > >> > necessarily > > >> > > > >>> bind > > >> > > > >>> this API with consumer. > > >> > > > >>> > > >> > > > >>> I am not strong on this issue. Let's see what other > > >> > > > committers/developers > > >> > > > >>> think about this. > > >> > > > >>> > > >> > > > >>> > > >> > > > >>> > > > >> > > > >>> > > > > >> > > > >>> > > Thanks, > > >> > > > >>> > > Dong > > >> > > > >>> > > > > >> > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < > > >> > > > >>> e...@confluent.io > > >> > > > >>> > > > > >> > > > >>> > > wrote: > > >> > > > >>> > > > > >> > > > >>> > > > Dong, > > >> > > > >>> > > > > > >> > > > >>> > > > Looks like that's an internal link, > > >> > > > >>> > > > https://cwiki.apache.org/confl > > >> uence/display/KAFKA/KIP-107% > > >> > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient > > >> > > > >>> > > > is the right one. > > >> > > > >>> > > > > > >> > > > >>> > > > I have a question about one of the rejected > > alternatives: > > >> > > > >>> > > > > > >> > > > >>> > > > > Using committed offset instead of an extra API to > > >> trigger > > >> > > data > > >> > > > >>> purge > > >> > > > >>> > > > operation. > > >> > > > >>> > > > > > >> > > > >>> > > > The KIP says this would be more complicated to > > implement. > > >> Why > > >> > > is > > >> > > > >>> that? > > >> > > > >>> > I > > >> > > > >>> > > > think brokers would have to consume the entire offsets > > >> topic, > > >> > > but > > >> > > > >>> the > > >> > > > >>> > > data > > >> > > > >>> > > > stored in memory doesn't seem to change and applying > > this > > >> > when > > >> > > > >>> updated > > >> > > > >>> > > > offsets are seen seems basically the same. It might > also > > >> be > > >> > > > >>> possible to > > >> > > > >>> > > > make it work even with multiple consumer groups if > that > > >> was > > >> > > > desired > > >> > > > >>> > > > (although that'd require tracking more data in memory) > > as > > >> a > > >> > > > >>> > > generalization > > >> > > > >>> > > > without requiring coordination between the consumer > > >> groups. > > >> > > Given > > >> > > > >>> the > > >> > > > >>> > > > motivation, I'm assuming this was considered > unnecessary > > >> > since > > >> > > > this > > >> > > > >>> > > > specifically targets intermediate stream processing > > >> topics. > > >> > > > >>> > > > > > >> > > > >>> > > > Another question is why expose this via AdminClient > > (which > > >> > > isn't > > >> > > > >>> public > > >> > > > >>> > > API > > >> > > > >>> > > > afaik)? Why not, for example, expose it on the > Consumer, > > >> > which > > >> > > is > > >> > > > >>> > > > presumably where you'd want access to it since the > > >> > > functionality > > >> > > > >>> > depends > > >> > > > >>> > > on > > >> > > > >>> > > > the consumer actually having consumed the data? > > >> > > > >>> > > > > > >> > > > >>> > > > -Ewen > > >> > > > >>> > > > > > >> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin < > > >> > lindon...@gmail.com> > > >> > > > >>> wrote: > > >> > > > >>> > > > > > >> > > > >>> > > > > Hi all, > > >> > > > >>> > > > > > > >> > > > >>> > > > > We created KIP-107 to propose addition of > > >> purgeDataBefore() > > >> > > API > > >> > > > >>> in > > >> > > > >>> > > > > AdminClient. > > >> > > > >>> > > > > > > >> > > > >>> > > > > Please find the KIP wiki in the link > > >> > > > https://iwww.corp.linkedin. > > >> > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka > > >> +purgeDataBefore%28%29+API+ > > >> > > > >>> > > > design+proposal. > > >> > > > >>> > > > > We > > >> > > > >>> > > > > would love to hear your comments and suggestions. > > >> > > > >>> > > > > > > >> > > > >>> > > > > Thanks, > > >> > > > >>> > > > > Dong > > >> > > > >>> > > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > >> > > > >>> > > > >> > > > >>> > > >> > > > >> > > >> > > > >> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -Regards, > > >> Mayuresh R. Gharat > > >> (862) 250-7125 > > >> > > > > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >