Hey Mayuresh, Thanks for the comment. If the message's offset is below low_watermark, then it should have been deleted by log retention policy. Thus it is OK not to expose this message to consumer. Does this answer your question?
Thanks, Dong On Fri, Jan 6, 2017 at 4:21 PM, Mayuresh Gharat <gharatmayures...@gmail.com> wrote: > Hi Dong, > > Thanks for the KIP. > > I had a question (which might have been answered before). > > 1) The KIP says that the low_water_mark will be updated periodically by the > broker like high_water_mark. > Essentially we want to use low_water_mark for cases where an entire segment > cannot be deleted because may be the segment_start_offset < PurgeOffset < > segment_end_offset, in which case we will set the low_water_mark to > PurgeOffset+1. > > 2) The KIP also says that messages below low_water_mark will not be exposed > for consumers, which does make sense since we want say that data below > low_water_mark is purged. > > Looking at above conditions, does it make sense not to update the > low_water_mark periodically but only on PurgeRequest? > The reason being, if we update it periodically then as per 2) we will not > be allowing consumers to re-consume data that is not purged but is below > low_water_mark. > > Thanks, > > Mayuresh > > > On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Thanks for reviewing the KIP! > > > > 1. The low_watermark will be checkpointed in a new file named > > "replication-low-watermark-checkpoint". It will have the same format as > > the existing replication-offset-checkpoint file. This allows us the keep > > the existing format of checkpoint files which maps TopicPartition to > Long. > > I just updated the "Public Interface" section in the KIP wiki to explain > > this file. > > > > 2. I think using low_watermark from leader to trigger log retention in > the > > follower will work correctly in the sense that all messages with offset < > > low_watermark can be deleted. But I am not sure that the efficiency is > the > > same, i.e. offset of messages which should be deleted (i.e. due to time > or > > size-based log retention policy) will be smaller than low_watermark from > > the leader. > > > > For example, say both the follower and the leader have messages with > > offsets in range [0, 2000]. If the follower does log rolling slightly > later > > than leader, the segments on follower would be [0, 1001], [1002, 2000] > and > > segments on leader would be [0, 1000], [1001, 2000]. After leader deletes > > the first segment, the low_watermark would be 1001. Thus the first > segment > > would stay on follower's disk unnecessarily which may double disk usage > at > > worst. > > > > Since this approach doesn't save us much, I am inclined to not include > this > > change to keep the KIP simple. > > > > Dong > > > > > > > > On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > Thanks for the proposal. Looks good overall. A couple of comments. > > > > > > 1. Where is the low_watermark checkpointed? Is that > > > in replication-offset-checkpoint? If so, do we need to bump up the > > version? > > > Could you also describe the format change? > > > > > > 2. For topics with "delete" retention, currently we let each replica > > delete > > > old segments independently. With low_watermark, we could just let > leaders > > > delete old segments through the deletion policy and the followers will > > > simply delete old segments based on low_watermark. Not sure if this > saves > > > much, but is a potential option that may be worth thinking about. > > > > > > Jun > > > > > > > > > > > > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenbl...@gmail.com> > > wrote: > > > > > > > one more example of complicated config - mirror maker. > > > > > > > > we definitely cant trust each and every topic owner to configure > their > > > > topics not to purge before they've been mirrored. > > > > which would mean there's a per-topic config (set by the owner) and a > > > > "global" config (where mirror makers are specified) and they need to > be > > > > "merged". > > > > for those topics that _are_ mirrored. > > > > which is a changing set of topics thats stored in an external system > > > > outside of kafka. > > > > if a topic is taken out of the mirror set the MM offset would be > > "frozen" > > > > at that point and prevent clean-up for all eternity, unless its > > > cleaned-up > > > > itself. > > > > > > > > ... > > > > > > > > complexity :-) > > > > > > > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com> > > > wrote: > > > > > > > > > in summary - i'm not opposed to the idea of a per-topic clean up > > config > > > > > that tracks some set of consumer groups' offsets (which would > > probably > > > > work > > > > > for 80% of use cases), but i definitely see a need to expose a > simple > > > API > > > > > for the more advanced/obscure/custom use cases (the other 20%). > > > > > > > > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com> > > > > wrote: > > > > > > > > > >> a major motivation for this KIP is cost savings. > > > > >> > > > > >> lots of internal systems at LI use kafka as an intermediate pipe, > > and > > > > set > > > > >> the topic retention period to a "safe enough" amount of time to be > > > able > > > > to > > > > >> recover from crashes/downtime and catch up to "now". this results > > in a > > > > few > > > > >> days' worth of retention typically. > > > > >> > > > > >> however, under normal operating conditions the consumers are > mostly > > > > >> caught-up and so early clean-up enables a big cost savings in > > storage. > > > > >> > > > > >> as for my points: > > > > >> > > > > >> 1. when discussing implementation options for automatic clean-up > we > > > > >> realized that cleaning up by keeping track of offsets stored in > > kafka > > > > >> requires some per-topic config - you need to specify which groups > to > > > > track. > > > > >> this becomes a problem because: > > > > >> 1.1 - relatively complicated code, to be written in the > broker. > > > > >> 1.2 - configuration needs to be maintained up to date by topic > > > > >> "owners" - of which we have thousands. failure to do so would > > decrease > > > > the > > > > >> cost benefit. > > > > >> 1.3 - some applications have a "reconsume" / "reinit" / > > > "bootstrap" > > > > >> workflow where they will reset their offsets to an earlier value > > than > > > > the > > > > >> one stored. this means that a stored offset of X does not always > > mean > > > > you > > > > >> can clean up to X-1. think of it as video encoding -some apps have > > > "key > > > > >> frames" they may seek back to which are before their current > offset. > > > > >> 1.4 - there are multiple possible strategies - you could clean > > up > > > > >> aggressively, retain some "time distance" from latest, some > "offset > > > > >> distance", etc. this we think would have made it very hard to > agree > > > on a > > > > >> single "correct" implementation that everyone would be happy with. > > it > > > > would > > > > >> be better to include the raw functionality in the API and leave > the > > > > >> "brains" to an external monitoring system where people could > > > > custom-taylor > > > > >> their logic > > > > >> > > > > >> 2. ad-hoc consumer groups: its common practice for devs to spin up > > > > >> console consumers and connect to a topic as a debug aid. SREs may > > also > > > > do > > > > >> this. there are also various other eco-system applications that > may > > > > >> consumer from topics (unknown to topic owners as those are infra > > > > monitoring > > > > >> tools). obviously such consumer-groups' offsets should be ignored > > for > > > > >> purposes of clean-up, but coming up with a bullet-proof way to do > > this > > > > is > > > > >> non-trivial and again ties with implementation complexity and > > > > inflexibility > > > > >> of a "one size fits all" solution in 1.4 above. > > > > >> > > > > >> 3. forceful clean-up: we have workflows that use kafka to move > > > gigantic > > > > >> blobs from offline hadoop processing flows into systems. the data > > > being > > > > >> "loaded" into such an online system can be several GBs in side and > > > take > > > > a > > > > >> long time to consume (they are sliced into many small msgs). > > sometimes > > > > the > > > > >> sender wants to abort and start a new blob before the current load > > > > process > > > > >> has completed - meaning the consumer's offsets are not yet caught > > up. > > > > >> > > > > >> 4. offsets outside of kafka: yes, you could force applications to > > > store > > > > >> their offsets twice, but thats inefficient. its better to expose a > > > raw, > > > > >> simple API and let such applications manage their own clean-up > logic > > > > (this > > > > >> again ties into 1.4 and no "one size fits all" solution) > > > > >> > > > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > >> > > > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava < > > > > >>> e...@confluent.io> > > > > >>> wrote: > > > > >>> > > > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > >>> > > > > > >>> > > Hey Ewen, > > > > >>> > > > > > > >>> > > Thanks for the review. As Radai explained, it would be > complex > > in > > > > >>> terms > > > > >>> > of > > > > >>> > > user configuration if we were to use committed offset to > decide > > > > data > > > > >>> > > deletion. We need a way to specify which groups need to > consume > > > > data > > > > >>> of > > > > >>> > > this partition. The broker will also need to consume the > entire > > > > >>> offsets > > > > >>> > > topic in that approach which has some overhead. I don't think > > it > > > is > > > > >>> that > > > > >>> > > hard to implement. But it will likely take more time to > discuss > > > > that > > > > >>> > > approach due to the new config and the server side overhead. > > > > >>> > > > > > > >>> > > We choose to put this API in AdminClient because the API is > > more > > > > >>> like an > > > > >>> > > administrative operation (such as listGroups, deleteTopics) > > than > > > a > > > > >>> > consumer > > > > >>> > > operation. It is not necessarily called by consumer only. For > > > > >>> example, we > > > > >>> > > can implement the "delete data before committed offset" > > approach > > > by > > > > >>> > running > > > > >>> > > an external service which calls purgeDataBefore() API based > on > > > > >>> committed > > > > >>> > > offset of consumer groups. > > > > >>> > > > > > > >>> > > I am not aware that AdminClient is not a public API. Suppose > it > > > is > > > > >>> not > > > > >>> > > public now, I assume we plan to make it public in the future > as > > > > part > > > > >>> of > > > > >>> > > KIP-4. Are we not making it public because its interface is > not > > > > >>> stable? > > > > >>> > If > > > > >>> > > so, can we just tag this new API as not stable in the code? > > > > >>> > > > > > > >>> > > > > > >>> > > > > > >>> > The AdminClient planned for KIP-4 is a new Java-based > > > implementation. > > > > >>> It's > > > > >>> > definitely confusing that both will be (could be?) named > > > AdminClient, > > > > >>> but > > > > >>> > we've kept the existing Scala AdminClient out of the public API > > and > > > > >>> have > > > > >>> > not required KIPs for changes to it. > > > > >>> > > > > > >>> > That said, I agree that if this type of API makes it into > Kafka, > > > > >>> having a > > > > >>> > (new, Java-based) AdminClient method would definitely be a good > > > idea. > > > > >>> An > > > > >>> > alternative path might be to have a Consumer-based > implementation > > > > since > > > > >>> > that seems like a very intuitive, natural way to use the > > protocol. > > > I > > > > >>> think > > > > >>> > optimizing for the expected use case would be a good idea. > > > > >>> > > > > > >>> > -Ewen > > > > >>> > > > > > >>> > Are you saying that the Scala AdminClient is not a public API > and > > > we > > > > >>> discourage addition of any new feature to this class? > > > > >>> > > > > >>> I still prefer to add it to AdminClient (Java version in the > future > > > and > > > > >>> Scala version in the short team) because I feel it belongs to > admin > > > > >>> operation instead of KafkaConsumer interface. For example, if in > > the > > > > >>> future > > > > >>> we implement the "delete data before committed offset" strategy > in > > an > > > > >>> external service, I feel it is a bit awkward if the service has > to > > > > >>> instantiate a KafkaConsumer and call > KafkaConsumer.purgeDataBefore( > > > > ...) > > > > >>> to > > > > >>> purge data. In other words, our expected use-case doesn't > > necessarily > > > > >>> bind > > > > >>> this API with consumer. > > > > >>> > > > > >>> I am not strong on this issue. Let's see what other > > > > committers/developers > > > > >>> think about this. > > > > >>> > > > > >>> > > > > >>> > > > > > >>> > > > > > > >>> > > Thanks, > > > > >>> > > Dong > > > > >>> > > > > > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < > > > > >>> e...@confluent.io > > > > >>> > > > > > > >>> > > wrote: > > > > >>> > > > > > > >>> > > > Dong, > > > > >>> > > > > > > > >>> > > > Looks like that's an internal link, > > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107% > > > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient > > > > >>> > > > is the right one. > > > > >>> > > > > > > > >>> > > > I have a question about one of the rejected alternatives: > > > > >>> > > > > > > > >>> > > > > Using committed offset instead of an extra API to trigger > > > data > > > > >>> purge > > > > >>> > > > operation. > > > > >>> > > > > > > > >>> > > > The KIP says this would be more complicated to implement. > Why > > > is > > > > >>> that? > > > > >>> > I > > > > >>> > > > think brokers would have to consume the entire offsets > topic, > > > but > > > > >>> the > > > > >>> > > data > > > > >>> > > > stored in memory doesn't seem to change and applying this > > when > > > > >>> updated > > > > >>> > > > offsets are seen seems basically the same. It might also be > > > > >>> possible to > > > > >>> > > > make it work even with multiple consumer groups if that was > > > > desired > > > > >>> > > > (although that'd require tracking more data in memory) as a > > > > >>> > > generalization > > > > >>> > > > without requiring coordination between the consumer groups. > > > Given > > > > >>> the > > > > >>> > > > motivation, I'm assuming this was considered unnecessary > > since > > > > this > > > > >>> > > > specifically targets intermediate stream processing topics. > > > > >>> > > > > > > > >>> > > > Another question is why expose this via AdminClient (which > > > isn't > > > > >>> public > > > > >>> > > API > > > > >>> > > > afaik)? Why not, for example, expose it on the Consumer, > > which > > > is > > > > >>> > > > presumably where you'd want access to it since the > > > functionality > > > > >>> > depends > > > > >>> > > on > > > > >>> > > > the consumer actually having consumed the data? > > > > >>> > > > > > > > >>> > > > -Ewen > > > > >>> > > > > > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin < > > lindon...@gmail.com> > > > > >>> wrote: > > > > >>> > > > > > > > >>> > > > > Hi all, > > > > >>> > > > > > > > > >>> > > > > We created KIP-107 to propose addition of > purgeDataBefore() > > > API > > > > >>> in > > > > >>> > > > > AdminClient. > > > > >>> > > > > > > > > >>> > > > > Please find the KIP wiki in the link > > > > https://iwww.corp.linkedin. > > > > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+ > API+ > > > > >>> > > > design+proposal. > > > > >>> > > > > We > > > > >>> > > > > would love to hear your comments and suggestions. > > > > >>> > > > > > > > > >>> > > > > Thanks, > > > > >>> > > > > Dong > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > -- > -Regards, > Mayuresh R. Gharat > (862) 250-7125 >