Hi all, When I am implementing the patch, I realized that the current usage of "low_watermark" is a bit confusing. So I made the following interface changes in the KIP:
- The newly added checkpoint file will be named log-begin-offset-checkpoint - Replace low_watermark with log_begin_offset in FetchRequestPartition and FetchResponsePartitionHeader The problem with the previous naming conversion is that, low_watermark implies minimum log begin offset of all replicas (similar to high watermark) and we return this value in the PurgeResponse. In other words, low_watermark can not be incremented if a follower is not live. Therefore we can not use low_watermark in the checkpoint file or in the FetchResponse from leader to followers if we want to persists the offset-to-purge received from user across broker rebounce. You can find the changes in KIP here <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67636826&selectedPageVersions=13&selectedPageVersions=14>. Please let me know if you have any concern with this change. Thanks, Dong On Mon, Jan 23, 2017 at 11:20 AM, Dong Lin <lindon...@gmail.com> wrote: > Thanks for the comment Jun. > > Yeah, I think there is use-case where this can be useful. Allowing for > asynchronous delete will be useful if an application doesn't need strong > guarantee of purgeDataFrom(), e.g. if it is done to help reduce disk usage > of kafka. The application may want to purge data for every time it does > auto-commit without wait for future object to complete. On the other hand, > synchronous delete will be useful if an application wants to make sure that > the sensitive or bad data is definitely deleted. I think returning a future > makes both choice available to user and it doesn't complicate > implementation much. > > > On Mon, Jan 23, 2017 at 10:45 AM, Jun Rao <j...@confluent.io> wrote: > >> I feel that it's simpler to just keep the format of the checkpoint file as >> it is and just add a separate checkpoint for low watermark. Low watermark >> and high watermark are maintained independently. So, not sure if there is >> significant benefit of storing them together. >> >> Looking at the KIP again. I actually have another question on the api. Is >> there any benefit of returning a Future in the purgeDataBefore() api? >> Since >> admin apis are used infrequently, it seems that it's simpler to just have >> a >> blocking api and returns Map<TopicPartition, PurgeDataResult>? >> >> Thanks, >> >> Jun >> >> On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Thanks for the comment Guozhang. Please don't worry about being late. I >> > would like to update the KIP if there is clear benefit of the new >> approach. >> > I am wondering if there is any use-case or operation aspects that would >> > benefit from the new approach. >> > >> > I am not saying that these checkpoint files have the same priority. I >> > mentioned other checkpoint files to suggest that it is OK to add one >> more >> > checkpoint file. To me three checkpoint files is not much different from >> > four checkpoint files. I am just inclined to not update the KIP if the >> only >> > benefit is to avoid addition of a new checkpoint file. >> > >> > >> > >> > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > To me the distinction between recovery-checkpoint and >> > > replication-checkpoint are different from the distinction between >> these >> > two >> > > hw checkpoint values: when broker starts up and act as the leader for >> a >> > > partition, it can live without seeing the recovery checkpoint, but >> just >> > > cannot rely on the existing last log segment and need to fetch from >> other >> > > replicas; but if the replication-checkpoint file is missing, it is a >> > > correctness issue, as it does not know from where to truncate its >> data, >> > and >> > > also how to respond to a fetch request. That is why I think we can >> > separate >> > > these two types of files, since the latter one is more important than >> the >> > > previous one. >> > > >> > > That being said, I do not want to recall another vote on this since >> it is >> > > my bad not responding before the vote is called. Just wanted to point >> out >> > > for the record that this approach may have some operational scenarios >> > where >> > > one of the replication files is missing and we need to treat them >> > > specifically. >> > > >> > > >> > > Guozhang >> > > >> > > >> > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > > > Yeah, your solution of adding new APIs certainly works and I don't >> > think >> > > > that is an issue. On the other hand I don't think it is an issue to >> > add a >> > > > new checkpoint file as well since we already have multiple >> checkpoint >> > > > files. The benefit of the new approach you mentioned is probably >> not an >> > > > issue in the current approach since high watermark and low watermark >> > > works >> > > > completely independently. Since there is no strong reason to choose >> > > either >> > > > of them, I am inclined to choose the one that makes less format >> change >> > > and >> > > > simpler in the Java API. The current approach seems better w.r.t >> this >> > > minor >> > > > reason. >> > > > >> > > > If you are strong that we should use the new approach, I can do >> that as >> > > > well. Please let me know if you think so, and I will need to ask >> > > > Jun/Joel/Becket to vote on this again since this changes the >> interface >> > of >> > > > the KIP. >> > > > >> > > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > > >> > > > > I think this is less of an issue: we can use the same patterns as >> in >> > > the >> > > > > request protocol, i.e.: >> > > > > >> > > > > write(Map[TP, Long]) // write the checkout point in v0 format >> > > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1 >> > > format >> > > > > >> > > > > CheckpointedOffsets read() // read the file relying on its >> version id >> > > > > >> > > > > class CheckpointedOffsets { >> > > > > >> > > > > Integer getVersion(); >> > > > > Long getFirstOffset(); >> > > > > Long getSecondOffset(); // would return NO_AVAILABLE with v0 >> > > format >> > > > > } >> > > > > >> > > > > >> > > > > As I think of it, another benefit is that we wont have a partition >> > that >> > > > > only have one of the watermarks in case of a failure in between >> > writing >> > > > two >> > > > > files. >> > > > > >> > > > > Guozhang >> > > > > >> > > > > On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Hey Guozhang, >> > > > > > >> > > > > > Thanks for the review:) Yes it is possible to combine them. Both >> > > > solution >> > > > > > will have the same performance. But I think the current solution >> > will >> > > > > give >> > > > > > us simpler Java class design. Note that we will have to change >> Java >> > > API >> > > > > > (e.g. read() and write()) of OffsetCheckpoint class in order to >> > > > provide a >> > > > > > map from TopicPartition to a pair of integers when we write to >> > > > checkpoint >> > > > > > file. This makes this class less generic since this API is not >> used >> > > by >> > > > > log >> > > > > > recovery checkpoint and log cleaner checkpoint which are also >> using >> > > > > > OffsetCheckpoint class. >> > > > > > >> > > > > > Dong >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Sat, Jan 21, 2017 at 12:28 PM, Guozhang Wang < >> > wangg...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > Hi Dong, >> > > > > > > >> > > > > > > Sorry for being late on reviewing this KIP. It LGTM overall, >> but >> > > I'm >> > > > > > > wondering if we can save adding the >> "replication-low-watermark- >> > > > > > checkpoint" >> > > > > > > file by just bumping up the version number of >> > "replication-offset- >> > > > > > > checkpoint" >> > > > > > > to let it have two values for each partition, i.e.: >> > > > > > > >> > > > > > > 1 // version number >> > > > > > > [number of partitions] >> > > > > > > [topic name] [partition id] [lwm] [hwm] >> > > > > > > >> > > > > > > >> > > > > > > This will affects the upgrade path a bit, but I think not by >> > large, >> > > > and >> > > > > > all >> > > > > > > other logic will not be affected. >> > > > > > > >> > > > > > > >> > > > > > > Guozhang >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin < >> lindon...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > > > Thanks to everyone who voted and provided feedback! >> > > > > > > > >> > > > > > > > This KIP is now adopted with 3 binding +1s (Jun, Joel, >> Becket) >> > > and >> > > > 2 >> > > > > > > > non-binding +1s (Radai, Mayuresh). >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Dong >> > > > > > > > >> > > > > > > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao <j...@confluent.io> >> > > wrote: >> > > > > > > > >> > > > > > > > > Hi, Dong, >> > > > > > > > > >> > > > > > > > > Thanks for the update. +1 >> > > > > > > > > >> > > > > > > > > Jun >> > > > > > > > > >> > > > > > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin < >> > lindon...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jun, >> > > > > > > > > > >> > > > > > > > > > After some more thinking, I agree with you that it is >> > better >> > > to >> > > > > > > simply >> > > > > > > > > > throw OffsetOutOfRangeException and not update >> > low_watermark >> > > if >> > > > > > > > > > offsetToPurge is larger than high_watermark. >> > > > > > > > > > >> > > > > > > > > > My use-case of allowing low_watermark > high_watermark >> in >> > > 2(b) >> > > > is >> > > > > > to >> > > > > > > > > allow >> > > > > > > > > > user to purge all the data in the log even if that data >> is >> > > not >> > > > > > fully >> > > > > > > > > > replicated to followers. An offset higher than >> > high_watermark >> > > > may >> > > > > > be >> > > > > > > > > > returned to user either through producer's >> RecordMetadata, >> > or >> > > > > > through >> > > > > > > > > > ListOffsetResponse if from_consumer option is false. >> > However, >> > > > > this >> > > > > > > may >> > > > > > > > > > cause problem in case of unclean leader election or when >> > > > consumer >> > > > > > > seeks >> > > > > > > > > to >> > > > > > > > > > the largest offset of the partition. It will complicate >> > this >> > > > KIP >> > > > > if >> > > > > > > we >> > > > > > > > > were >> > > > > > > > > > to address these two problems. >> > > > > > > > > > >> > > > > > > > > > At this moment I prefer to keep this KIP simple by >> > requiring >> > > > > > > > > low_watermark >> > > > > > > > > > <= high_watermark. The caveat is that if user does want >> to >> > > > purge >> > > > > > > *all* >> > > > > > > > > the >> > > > > > > > > > data that is already produced, then he needs to stop all >> > > > > producers >> > > > > > > that >> > > > > > > > > are >> > > > > > > > > > producing into this topic, wait long enough for all >> > followers >> > > > to >> > > > > > > catch >> > > > > > > > > up, >> > > > > > > > > > and then purge data using the latest offset of this >> > > partition, >> > > > > i.e. >> > > > > > > > > > high_watermark. We can revisit this if some strong >> use-case >> > > > comes >> > > > > > up >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > future. >> > > > > > > > > > >> > > > > > > > > > I also updated the KIP to allow user to use offset -1L >> to >> > > > > indicate >> > > > > > > > > > high_watermark in the PurgeRequest. In the future we can >> > > allow >> > > > > > users >> > > > > > > to >> > > > > > > > > use >> > > > > > > > > > offset -2L to indicate that they want to purge all data >> up >> > to >> > > > > > > > > logEndOffset. >> > > > > > > > > > >> > > > > > > > > > Thanks! >> > > > > > > > > > Dong >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao < >> > j...@confluent.io> >> > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hi, Dong, >> > > > > > > > > > > >> > > > > > > > > > > For 2(b), it seems a bit weird to allow highWatermark >> to >> > be >> > > > > > smaller >> > > > > > > > > than >> > > > > > > > > > > lowWatermark. Also, from the consumer's perspective, >> > > messages >> > > > > are >> > > > > > > > > > available >> > > > > > > > > > > only up to highWatermark. What if we simply throw >> > > > > > > > > > OffsetOutOfRangeException >> > > > > > > > > > > if offsetToPurge is larger than highWatermark? >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > >> > > > > > > > > > > Jun >> > > > > > > > > > > >> > > > > > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin < >> > > > lindon...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Jun, >> > > > > > > > > > > > >> > > > > > > > > > > > Thank you. Please see my answers below. The KIP is >> > > updated >> > > > to >> > > > > > > > answer >> > > > > > > > > > > these >> > > > > > > > > > > > questions (see here >> > > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ >> > > > > > > > diffpagesbyversion.action >> > > > > > > > > ? >> > > > > > > > > > > > pageId=67636826&selectedPageVersions=5& >> > > > > selectedPageVersions=6> >> > > > > > > > > > > > ). >> > > > > > > > > > > > >> > > > > > > > > > > > 1. Yes, in this KIP we wait for all replicas. This >> is >> > the >> > > > > same >> > > > > > as >> > > > > > > > if >> > > > > > > > > > > > producer sends a messge with ack=all and >> > > isr=all_replicas. >> > > > So >> > > > > > it >> > > > > > > > > seems >> > > > > > > > > > > that >> > > > > > > > > > > > the comparison is OK? >> > > > > > > > > > > > >> > > > > > > > > > > > 2. Good point! I haven't thought about the case >> where >> > the >> > > > > > > > > > user-specified >> > > > > > > > > > > > offset > logEndOffset. Please see answers below. >> > > > > > > > > > > > >> > > > > > > > > > > > a) If offsetToPurge < lowWatermark, the first >> condition >> > > > > > > > > > > > of DelayedOperationPurgatory will be satisfied >> > > immediately >> > > > > when >> > > > > > > > > broker >> > > > > > > > > > > > receives PurgeRequest. Broker will send >> PurgeResponse >> > to >> > > > > admin >> > > > > > > > client >> > > > > > > > > > > > immediately. The response maps this partition to the >> > > > > > > lowWatermark. >> > > > > > > > > > > > >> > > > > > > > > > > > This case is covered as the first condition of >> > > > > > > > > > DelayedOperationPurgatory >> > > > > > > > > > > in >> > > > > > > > > > > > the current KIP. >> > > > > > > > > > > > >> > > > > > > > > > > > b) If highWatermark < offsetToPurge < logEndOffset, >> > > leader >> > > > > will >> > > > > > > > send >> > > > > > > > > > > > FetchResponse with low_watermark=offsetToPurge. >> > Follower >> > > > > > records >> > > > > > > > the >> > > > > > > > > > > > offsetToPurge as low_watermark and sends >> FetchRequest >> > to >> > > > the >> > > > > > > leader >> > > > > > > > > > with >> > > > > > > > > > > > the new low_watermark. Leader will then send >> > > PurgeResponse >> > > > to >> > > > > > > admin >> > > > > > > > > > > client >> > > > > > > > > > > > which maps this partition to the new low_watermark. >> The >> > > > data >> > > > > in >> > > > > > > the >> > > > > > > > > > range >> > > > > > > > > > > > [highWatermark, offsetToPurge] will still be >> appended >> > > from >> > > > > > leader >> > > > > > > > to >> > > > > > > > > > > > followers but will not be exposed to consumers. And >> in >> > a >> > > > > short >> > > > > > > > period >> > > > > > > > > > of >> > > > > > > > > > > > time low_watermark on the follower will be higher >> than >> > > > their >> > > > > > > > > > > highWatermark. >> > > > > > > > > > > > >> > > > > > > > > > > > This case is also covered in the current KIP so no >> > change >> > > > is >> > > > > > > > > required. >> > > > > > > > > > > > >> > > > > > > > > > > > c) If logEndOffset < offsetToPurge, leader will send >> > > > > > > PurgeResponse >> > > > > > > > to >> > > > > > > > > > > admin >> > > > > > > > > > > > client immediately. The response maps this >> partition to >> > > > > > > > > > > > OffsetOutOfRangeException. >> > > > > > > > > > > > >> > > > > > > > > > > > This case is not covered by the current KIP. I just >> > added >> > > > > this >> > > > > > as >> > > > > > > > the >> > > > > > > > > > > > second condition for the PurgeRequest to be removed >> > from >> > > > > > > > > > > > DelayedOperationPurgatory (in the Proposed Change >> > > section). >> > > > > > Since >> > > > > > > > the >> > > > > > > > > > > > PurgeRequest is satisfied immediately when the >> leader >> > > > > receives >> > > > > > > it, >> > > > > > > > it >> > > > > > > > > > > > actually won't be put into the >> > DelayedOperationPurgatory. >> > > > > > > > > > > > >> > > > > > > > > > > > 3. Yes, lowWatermark will be used when >> smallest_offset >> > is >> > > > > used >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > > ListOffsetRequest. I just updated Proposed Change >> > section >> > > > to >> > > > > > > > specify >> > > > > > > > > > > this. >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > Dong >> > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao < >> > > j...@confluent.io >> > > > > >> > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi, Dong, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks for the KIP. Looks good overall. Just a few >> > more >> > > > > > > comments. >> > > > > > > > > > > > > >> > > > > > > > > > > > > 1."Note that the way broker handles PurgeRequest >> is >> > > > similar >> > > > > > to >> > > > > > > > how >> > > > > > > > > it >> > > > > > > > > > > > > handles ProduceRequest with ack = -1 and >> > > > isr=all_replicas". >> > > > > > It >> > > > > > > > > seems >> > > > > > > > > > > that >> > > > > > > > > > > > > the implementation is a bit different. In this >> KIP, >> > we >> > > > wait >> > > > > > for >> > > > > > > > all >> > > > > > > > > > > > > replicas. But in producer, acks=all means waiting >> for >> > > all >> > > > > > > in-sync >> > > > > > > > > > > > replicas. >> > > > > > > > > > > > > >> > > > > > > > > > > > > 2. Could you describe the behavior when the >> specified >> > > > > > > > offsetToPurge >> > > > > > > > > > is >> > > > > > > > > > > > (a) >> > > > > > > > > > > > > smaller than lowWatermark, (b) larger than >> > > highWatermark, >> > > > > but >> > > > > > > > > smaller >> > > > > > > > > > > > than >> > > > > > > > > > > > > log end offset, (c) larger than log end offset? >> > > > > > > > > > > > > >> > > > > > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark be >> > > > returned >> > > > > > when >> > > > > > > > the >> > > > > > > > > > > > > smallest_offset option is used? >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Jun >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin < >> > > > > > lindon...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > It seems that there is no further concern with >> the >> > > > > KIP-107. >> > > > > > > At >> > > > > > > > > this >> > > > > > > > > > > > point >> > > > > > > > > > > > > > we would like to start the voting process. The >> KIP >> > > can >> > > > be >> > > > > > > found >> > > > > > > > > at >> > > > > > > > > > > > > > https://cwiki.apache.org/confl >> > > uence/display/KAFKA/KIP- >> > > > > 107 >> > > > > > > > > > > > > > %3A+Add+purgeDataBefore%28%29+ >> API+in+AdminClient. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > Dong >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > -- Guozhang >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >