Hi, Dong, The changes sound good to me. Also, thanks for the explanation of returning a future from purgeDataFrom(). We can keep it that way.
Thanks, Jun On Mon, Jan 23, 2017 at 4:24 PM, Dong Lin <lindon...@gmail.com> wrote: > Hi all, > > When I am implementing the patch, I realized that the current usage of > "low_watermark" is a bit confusing. So I made the following interface > changes in the KIP: > > - The newly added checkpoint file will be named log-begin-offset-checkpoint > - Replace low_watermark with log_begin_offset in FetchRequestPartition and > FetchResponsePartitionHeader > > The problem with the previous naming conversion is that, low_watermark > implies minimum log begin offset of all replicas (similar to high > watermark) and we return this value in the PurgeResponse. In other words, > low_watermark can not be incremented if a follower is not live. Therefore > we can not use low_watermark in the checkpoint file or in the FetchResponse > from leader to followers if we want to persists the offset-to-purge > received from user across broker rebounce. > > You can find the changes in KIP here > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action? > pageId=67636826&selectedPageVersions=13&selectedPageVersions=14>. > Please let me know if you have any concern with this change. > > Thanks, > Dong > > On Mon, Jan 23, 2017 at 11:20 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Thanks for the comment Jun. > > > > Yeah, I think there is use-case where this can be useful. Allowing for > > asynchronous delete will be useful if an application doesn't need strong > > guarantee of purgeDataFrom(), e.g. if it is done to help reduce disk > usage > > of kafka. The application may want to purge data for every time it does > > auto-commit without wait for future object to complete. On the other > hand, > > synchronous delete will be useful if an application wants to make sure > that > > the sensitive or bad data is definitely deleted. I think returning a > future > > makes both choice available to user and it doesn't complicate > > implementation much. > > > > > > On Mon, Jan 23, 2017 at 10:45 AM, Jun Rao <j...@confluent.io> wrote: > > > >> I feel that it's simpler to just keep the format of the checkpoint file > as > >> it is and just add a separate checkpoint for low watermark. Low > watermark > >> and high watermark are maintained independently. So, not sure if there > is > >> significant benefit of storing them together. > >> > >> Looking at the KIP again. I actually have another question on the api. > Is > >> there any benefit of returning a Future in the purgeDataBefore() api? > >> Since > >> admin apis are used infrequently, it seems that it's simpler to just > have > >> a > >> blocking api and returns Map<TopicPartition, PurgeDataResult>? > >> > >> Thanks, > >> > >> Jun > >> > >> On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin <lindon...@gmail.com> wrote: > >> > >> > Thanks for the comment Guozhang. Please don't worry about being late. > I > >> > would like to update the KIP if there is clear benefit of the new > >> approach. > >> > I am wondering if there is any use-case or operation aspects that > would > >> > benefit from the new approach. > >> > > >> > I am not saying that these checkpoint files have the same priority. I > >> > mentioned other checkpoint files to suggest that it is OK to add one > >> more > >> > checkpoint file. To me three checkpoint files is not much different > from > >> > four checkpoint files. I am just inclined to not update the KIP if the > >> only > >> > benefit is to avoid addition of a new checkpoint file. > >> > > >> > > >> > > >> > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > To me the distinction between recovery-checkpoint and > >> > > replication-checkpoint are different from the distinction between > >> these > >> > two > >> > > hw checkpoint values: when broker starts up and act as the leader > for > >> a > >> > > partition, it can live without seeing the recovery checkpoint, but > >> just > >> > > cannot rely on the existing last log segment and need to fetch from > >> other > >> > > replicas; but if the replication-checkpoint file is missing, it is a > >> > > correctness issue, as it does not know from where to truncate its > >> data, > >> > and > >> > > also how to respond to a fetch request. That is why I think we can > >> > separate > >> > > these two types of files, since the latter one is more important > than > >> the > >> > > previous one. > >> > > > >> > > That being said, I do not want to recall another vote on this since > >> it is > >> > > my bad not responding before the vote is called. Just wanted to > point > >> out > >> > > for the record that this approach may have some operational > scenarios > >> > where > >> > > one of the replication files is missing and we need to treat them > >> > > specifically. > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > > >> > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > >> > > > Yeah, your solution of adding new APIs certainly works and I don't > >> > think > >> > > > that is an issue. On the other hand I don't think it is an issue > to > >> > add a > >> > > > new checkpoint file as well since we already have multiple > >> checkpoint > >> > > > files. The benefit of the new approach you mentioned is probably > >> not an > >> > > > issue in the current approach since high watermark and low > watermark > >> > > works > >> > > > completely independently. Since there is no strong reason to > choose > >> > > either > >> > > > of them, I am inclined to choose the one that makes less format > >> change > >> > > and > >> > > > simpler in the Java API. The current approach seems better w.r.t > >> this > >> > > minor > >> > > > reason. > >> > > > > >> > > > If you are strong that we should use the new approach, I can do > >> that as > >> > > > well. Please let me know if you think so, and I will need to ask > >> > > > Jun/Joel/Becket to vote on this again since this changes the > >> interface > >> > of > >> > > > the KIP. > >> > > > > >> > > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang < > wangg...@gmail.com> > >> > > wrote: > >> > > > > >> > > > > I think this is less of an issue: we can use the same patterns > as > >> in > >> > > the > >> > > > > request protocol, i.e.: > >> > > > > > >> > > > > write(Map[TP, Long]) // write the checkout point in v0 format > >> > > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in > v1 > >> > > format > >> > > > > > >> > > > > CheckpointedOffsets read() // read the file relying on its > >> version id > >> > > > > > >> > > > > class CheckpointedOffsets { > >> > > > > > >> > > > > Integer getVersion(); > >> > > > > Long getFirstOffset(); > >> > > > > Long getSecondOffset(); // would return NO_AVAILABLE with > v0 > >> > > format > >> > > > > } > >> > > > > > >> > > > > > >> > > > > As I think of it, another benefit is that we wont have a > partition > >> > that > >> > > > > only have one of the watermarks in case of a failure in between > >> > writing > >> > > > two > >> > > > > files. > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin <lindon...@gmail.com > > > >> > > wrote: > >> > > > > > >> > > > > > Hey Guozhang, > >> > > > > > > >> > > > > > Thanks for the review:) Yes it is possible to combine them. > Both > >> > > > solution > >> > > > > > will have the same performance. But I think the current > solution > >> > will > >> > > > > give > >> > > > > > us simpler Java class design. Note that we will have to change > >> Java > >> > > API > >> > > > > > (e.g. read() and write()) of OffsetCheckpoint class in order > to > >> > > > provide a > >> > > > > > map from TopicPartition to a pair of integers when we write to > >> > > > checkpoint > >> > > > > > file. This makes this class less generic since this API is not > >> used > >> > > by > >> > > > > log > >> > > > > > recovery checkpoint and log cleaner checkpoint which are also > >> using > >> > > > > > OffsetCheckpoint class. > >> > > > > > > >> > > > > > Dong > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Sat, Jan 21, 2017 at 12:28 PM, Guozhang Wang < > >> > wangg...@gmail.com> > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Hi Dong, > >> > > > > > > > >> > > > > > > Sorry for being late on reviewing this KIP. It LGTM overall, > >> but > >> > > I'm > >> > > > > > > wondering if we can save adding the > >> "replication-low-watermark- > >> > > > > > checkpoint" > >> > > > > > > file by just bumping up the version number of > >> > "replication-offset- > >> > > > > > > checkpoint" > >> > > > > > > to let it have two values for each partition, i.e.: > >> > > > > > > > >> > > > > > > 1 // version number > >> > > > > > > [number of partitions] > >> > > > > > > [topic name] [partition id] [lwm] [hwm] > >> > > > > > > > >> > > > > > > > >> > > > > > > This will affects the upgrade path a bit, but I think not by > >> > large, > >> > > > and > >> > > > > > all > >> > > > > > > other logic will not be affected. > >> > > > > > > > >> > > > > > > > >> > > > > > > Guozhang > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin < > >> lindon...@gmail.com> > >> > > > wrote: > >> > > > > > > > >> > > > > > > > Thanks to everyone who voted and provided feedback! > >> > > > > > > > > >> > > > > > > > This KIP is now adopted with 3 binding +1s (Jun, Joel, > >> Becket) > >> > > and > >> > > > 2 > >> > > > > > > > non-binding +1s (Radai, Mayuresh). > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Dong > >> > > > > > > > > >> > > > > > > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao < > j...@confluent.io> > >> > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi, Dong, > >> > > > > > > > > > >> > > > > > > > > Thanks for the update. +1 > >> > > > > > > > > > >> > > > > > > > > Jun > >> > > > > > > > > > >> > > > > > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin < > >> > lindon...@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Jun, > >> > > > > > > > > > > >> > > > > > > > > > After some more thinking, I agree with you that it is > >> > better > >> > > to > >> > > > > > > simply > >> > > > > > > > > > throw OffsetOutOfRangeException and not update > >> > low_watermark > >> > > if > >> > > > > > > > > > offsetToPurge is larger than high_watermark. > >> > > > > > > > > > > >> > > > > > > > > > My use-case of allowing low_watermark > high_watermark > >> in > >> > > 2(b) > >> > > > is > >> > > > > > to > >> > > > > > > > > allow > >> > > > > > > > > > user to purge all the data in the log even if that > data > >> is > >> > > not > >> > > > > > fully > >> > > > > > > > > > replicated to followers. An offset higher than > >> > high_watermark > >> > > > may > >> > > > > > be > >> > > > > > > > > > returned to user either through producer's > >> RecordMetadata, > >> > or > >> > > > > > through > >> > > > > > > > > > ListOffsetResponse if from_consumer option is false. > >> > However, > >> > > > > this > >> > > > > > > may > >> > > > > > > > > > cause problem in case of unclean leader election or > when > >> > > > consumer > >> > > > > > > seeks > >> > > > > > > > > to > >> > > > > > > > > > the largest offset of the partition. It will > complicate > >> > this > >> > > > KIP > >> > > > > if > >> > > > > > > we > >> > > > > > > > > were > >> > > > > > > > > > to address these two problems. > >> > > > > > > > > > > >> > > > > > > > > > At this moment I prefer to keep this KIP simple by > >> > requiring > >> > > > > > > > > low_watermark > >> > > > > > > > > > <= high_watermark. The caveat is that if user does > want > >> to > >> > > > purge > >> > > > > > > *all* > >> > > > > > > > > the > >> > > > > > > > > > data that is already produced, then he needs to stop > all > >> > > > > producers > >> > > > > > > that > >> > > > > > > > > are > >> > > > > > > > > > producing into this topic, wait long enough for all > >> > followers > >> > > > to > >> > > > > > > catch > >> > > > > > > > > up, > >> > > > > > > > > > and then purge data using the latest offset of this > >> > > partition, > >> > > > > i.e. > >> > > > > > > > > > high_watermark. We can revisit this if some strong > >> use-case > >> > > > comes > >> > > > > > up > >> > > > > > > in > >> > > > > > > > > the > >> > > > > > > > > > future. > >> > > > > > > > > > > >> > > > > > > > > > I also updated the KIP to allow user to use offset -1L > >> to > >> > > > > indicate > >> > > > > > > > > > high_watermark in the PurgeRequest. In the future we > can > >> > > allow > >> > > > > > users > >> > > > > > > to > >> > > > > > > > > use > >> > > > > > > > > > offset -2L to indicate that they want to purge all > data > >> up > >> > to > >> > > > > > > > > logEndOffset. > >> > > > > > > > > > > >> > > > > > > > > > Thanks! > >> > > > > > > > > > Dong > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao < > >> > j...@confluent.io> > >> > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi, Dong, > >> > > > > > > > > > > > >> > > > > > > > > > > For 2(b), it seems a bit weird to allow > highWatermark > >> to > >> > be > >> > > > > > smaller > >> > > > > > > > > than > >> > > > > > > > > > > lowWatermark. Also, from the consumer's perspective, > >> > > messages > >> > > > > are > >> > > > > > > > > > available > >> > > > > > > > > > > only up to highWatermark. What if we simply throw > >> > > > > > > > > > OffsetOutOfRangeException > >> > > > > > > > > > > if offsetToPurge is larger than highWatermark? > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks, > >> > > > > > > > > > > > >> > > > > > > > > > > Jun > >> > > > > > > > > > > > >> > > > > > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin < > >> > > > lindon...@gmail.com > >> > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Hi Jun, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thank you. Please see my answers below. The KIP is > >> > > updated > >> > > > to > >> > > > > > > > answer > >> > > > > > > > > > > these > >> > > > > > > > > > > > questions (see here > >> > > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ > >> > > > > > > > diffpagesbyversion.action > >> > > > > > > > > ? > >> > > > > > > > > > > > pageId=67636826&selectedPageVersions=5& > >> > > > > selectedPageVersions=6> > >> > > > > > > > > > > > ). > >> > > > > > > > > > > > > >> > > > > > > > > > > > 1. Yes, in this KIP we wait for all replicas. This > >> is > >> > the > >> > > > > same > >> > > > > > as > >> > > > > > > > if > >> > > > > > > > > > > > producer sends a messge with ack=all and > >> > > isr=all_replicas. > >> > > > So > >> > > > > > it > >> > > > > > > > > seems > >> > > > > > > > > > > that > >> > > > > > > > > > > > the comparison is OK? > >> > > > > > > > > > > > > >> > > > > > > > > > > > 2. Good point! I haven't thought about the case > >> where > >> > the > >> > > > > > > > > > user-specified > >> > > > > > > > > > > > offset > logEndOffset. Please see answers below. > >> > > > > > > > > > > > > >> > > > > > > > > > > > a) If offsetToPurge < lowWatermark, the first > >> condition > >> > > > > > > > > > > > of DelayedOperationPurgatory will be satisfied > >> > > immediately > >> > > > > when > >> > > > > > > > > broker > >> > > > > > > > > > > > receives PurgeRequest. Broker will send > >> PurgeResponse > >> > to > >> > > > > admin > >> > > > > > > > client > >> > > > > > > > > > > > immediately. The response maps this partition to > the > >> > > > > > > lowWatermark. > >> > > > > > > > > > > > > >> > > > > > > > > > > > This case is covered as the first condition of > >> > > > > > > > > > DelayedOperationPurgatory > >> > > > > > > > > > > in > >> > > > > > > > > > > > the current KIP. > >> > > > > > > > > > > > > >> > > > > > > > > > > > b) If highWatermark < offsetToPurge < > logEndOffset, > >> > > leader > >> > > > > will > >> > > > > > > > send > >> > > > > > > > > > > > FetchResponse with low_watermark=offsetToPurge. > >> > Follower > >> > > > > > records > >> > > > > > > > the > >> > > > > > > > > > > > offsetToPurge as low_watermark and sends > >> FetchRequest > >> > to > >> > > > the > >> > > > > > > leader > >> > > > > > > > > > with > >> > > > > > > > > > > > the new low_watermark. Leader will then send > >> > > PurgeResponse > >> > > > to > >> > > > > > > admin > >> > > > > > > > > > > client > >> > > > > > > > > > > > which maps this partition to the new > low_watermark. > >> The > >> > > > data > >> > > > > in > >> > > > > > > the > >> > > > > > > > > > range > >> > > > > > > > > > > > [highWatermark, offsetToPurge] will still be > >> appended > >> > > from > >> > > > > > leader > >> > > > > > > > to > >> > > > > > > > > > > > followers but will not be exposed to consumers. > And > >> in > >> > a > >> > > > > short > >> > > > > > > > period > >> > > > > > > > > > of > >> > > > > > > > > > > > time low_watermark on the follower will be higher > >> than > >> > > > their > >> > > > > > > > > > > highWatermark. > >> > > > > > > > > > > > > >> > > > > > > > > > > > This case is also covered in the current KIP so no > >> > change > >> > > > is > >> > > > > > > > > required. > >> > > > > > > > > > > > > >> > > > > > > > > > > > c) If logEndOffset < offsetToPurge, leader will > send > >> > > > > > > PurgeResponse > >> > > > > > > > to > >> > > > > > > > > > > admin > >> > > > > > > > > > > > client immediately. The response maps this > >> partition to > >> > > > > > > > > > > > OffsetOutOfRangeException. > >> > > > > > > > > > > > > >> > > > > > > > > > > > This case is not covered by the current KIP. I > just > >> > added > >> > > > > this > >> > > > > > as > >> > > > > > > > the > >> > > > > > > > > > > > second condition for the PurgeRequest to be > removed > >> > from > >> > > > > > > > > > > > DelayedOperationPurgatory (in the Proposed Change > >> > > section). > >> > > > > > Since > >> > > > > > > > the > >> > > > > > > > > > > > PurgeRequest is satisfied immediately when the > >> leader > >> > > > > receives > >> > > > > > > it, > >> > > > > > > > it > >> > > > > > > > > > > > actually won't be put into the > >> > DelayedOperationPurgatory. > >> > > > > > > > > > > > > >> > > > > > > > > > > > 3. Yes, lowWatermark will be used when > >> smallest_offset > >> > is > >> > > > > used > >> > > > > > in > >> > > > > > > > the > >> > > > > > > > > > > > ListOffsetRequest. I just updated Proposed Change > >> > section > >> > > > to > >> > > > > > > > specify > >> > > > > > > > > > > this. > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > Dong > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao < > >> > > j...@confluent.io > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi, Dong, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for the KIP. Looks good overall. Just a > few > >> > more > >> > > > > > > comments. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > 1."Note that the way broker handles PurgeRequest > >> is > >> > > > similar > >> > > > > > to > >> > > > > > > > how > >> > > > > > > > > it > >> > > > > > > > > > > > > handles ProduceRequest with ack = -1 and > >> > > > isr=all_replicas". > >> > > > > > It > >> > > > > > > > > seems > >> > > > > > > > > > > that > >> > > > > > > > > > > > > the implementation is a bit different. In this > >> KIP, > >> > we > >> > > > wait > >> > > > > > for > >> > > > > > > > all > >> > > > > > > > > > > > > replicas. But in producer, acks=all means > waiting > >> for > >> > > all > >> > > > > > > in-sync > >> > > > > > > > > > > > replicas. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > 2. Could you describe the behavior when the > >> specified > >> > > > > > > > offsetToPurge > >> > > > > > > > > > is > >> > > > > > > > > > > > (a) > >> > > > > > > > > > > > > smaller than lowWatermark, (b) larger than > >> > > highWatermark, > >> > > > > but > >> > > > > > > > > smaller > >> > > > > > > > > > > > than > >> > > > > > > > > > > > > log end offset, (c) larger than log end offset? > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark > be > >> > > > returned > >> > > > > > when > >> > > > > > > > the > >> > > > > > > > > > > > > smallest_offset option is used? > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Jun > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin < > >> > > > > > lindon...@gmail.com > >> > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > It seems that there is no further concern with > >> the > >> > > > > KIP-107. > >> > > > > > > At > >> > > > > > > > > this > >> > > > > > > > > > > > point > >> > > > > > > > > > > > > > we would like to start the voting process. The > >> KIP > >> > > can > >> > > > be > >> > > > > > > found > >> > > > > > > > > at > >> > > > > > > > > > > > > > https://cwiki.apache.org/confl > >> > > uence/display/KAFKA/KIP- > >> > > > > 107 > >> > > > > > > > > > > > > > %3A+Add+purgeDataBefore%28%29+ > >> API+in+AdminClient. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > Dong > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > -- Guozhang > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > > > > >