Hi all,

When I am implementing the patch, I realized that the current usage of
"low_watermark" is a bit confusing. So I made the following interface
changes in the KIP:

- The newly added checkpoint file will be named log-begin-offset-checkpoint
- Replace low_watermark with log_begin_offset in FetchRequestPartition and
FetchResponsePartitionHeader

The problem with the previous naming conversion is that, low_watermark
implies minimum log begin offset of all replicas (similar to high
watermark) and we return this value in the PurgeResponse. In other words,
low_watermark can not be incremented if a follower is not live. Therefore
we can not use low_watermark in the checkpoint file or in the FetchResponse
from leader to followers if we want to persists the offset-to-purge
received from user across broker rebounce.

You can find the changes in KIP here
<https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67636826&selectedPageVersions=13&selectedPageVersions=14>.
Please let me know if you have any concern with this change.

Thanks,
Dong

On Mon, Jan 23, 2017 at 11:20 AM, Dong Lin <lindon...@gmail.com> wrote:

> Thanks for the comment Jun.
>
> Yeah, I think there is use-case where this can be useful. Allowing for
> asynchronous delete will be useful if an application doesn't need strong
> guarantee of purgeDataFrom(), e.g. if it is done to help reduce disk usage
> of kafka. The application may want to purge data for every time it does
> auto-commit without wait for future object to complete. On the other hand,
> synchronous delete will be useful if an application wants to make sure that
> the sensitive or bad data is definitely deleted. I think returning a future
> makes both choice available to user and it doesn't complicate
> implementation much.
>
>
> On Mon, Jan 23, 2017 at 10:45 AM, Jun Rao <j...@confluent.io> wrote:
>
>> I feel that it's simpler to just keep the format of the checkpoint file as
>> it is and just add a separate checkpoint for low watermark. Low watermark
>> and high watermark are maintained independently. So, not sure if there is
>> significant benefit of storing them together.
>>
>> Looking at the KIP again. I actually have another question on the api. Is
>> there any benefit of returning a Future in the purgeDataBefore() api?
>> Since
>> admin apis are used infrequently, it seems that it's simpler to just have
>> a
>> blocking api and returns Map<TopicPartition, PurgeDataResult>?
>>
>> Thanks,
>>
>> Jun
>>
>> On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Thanks for the comment Guozhang. Please don't worry about being late. I
>> > would like to update the KIP if there is clear benefit of the new
>> approach.
>> > I am wondering if there is any use-case or operation aspects that would
>> > benefit from the new approach.
>> >
>> > I am not saying that these checkpoint files have the same priority. I
>> > mentioned other checkpoint files to suggest that it is OK to add one
>> more
>> > checkpoint file. To me three checkpoint files is not much different from
>> > four checkpoint files. I am just inclined to not update the KIP if the
>> only
>> > benefit is to avoid addition of a new checkpoint file.
>> >
>> >
>> >
>> > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > To me the distinction between recovery-checkpoint and
>> > > replication-checkpoint are different from the distinction between
>> these
>> > two
>> > > hw checkpoint values: when broker starts up and act as the leader for
>> a
>> > > partition, it can live without seeing the recovery checkpoint, but
>> just
>> > > cannot rely on the existing last log segment and need to fetch from
>> other
>> > > replicas; but if the replication-checkpoint file is missing, it is a
>> > > correctness issue, as it does not know from where to truncate its
>> data,
>> > and
>> > > also how to respond to a fetch request. That is why I think we can
>> > separate
>> > > these two types of files, since the latter one is more important than
>> the
>> > > previous one.
>> > >
>> > > That being said, I do not want to recall another vote on this since
>> it is
>> > > my bad not responding before the vote is called. Just wanted to point
>> out
>> > > for the record that this approach may have some operational scenarios
>> > where
>> > > one of the replication files is missing and we need to treat them
>> > > specifically.
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >
>> > > > Yeah, your solution of adding new APIs certainly works and I don't
>> > think
>> > > > that is an issue. On the other hand I don't think it is an issue to
>> > add a
>> > > > new checkpoint file as well since we already have multiple
>> checkpoint
>> > > > files. The benefit of the new approach you mentioned is probably
>> not an
>> > > > issue in the current approach since high watermark and low watermark
>> > > works
>> > > > completely independently. Since there is no strong reason to choose
>> > > either
>> > > > of them, I am inclined to choose the one that makes less format
>> change
>> > > and
>> > > > simpler in the Java API. The current approach seems better w.r.t
>> this
>> > > minor
>> > > > reason.
>> > > >
>> > > > If you are strong that we should use the new approach, I can do
>> that as
>> > > > well. Please let me know if you think so, and I will need to ask
>> > > > Jun/Joel/Becket to vote on this again since this changes the
>> interface
>> > of
>> > > > the KIP.
>> > > >
>> > > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > I think this is less of an issue: we can use the same patterns as
>> in
>> > > the
>> > > > > request protocol, i.e.:
>> > > > >
>> > > > > write(Map[TP, Long]) // write the checkout point in v0 format
>> > > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1
>> > > format
>> > > > >
>> > > > > CheckpointedOffsets read() // read the file relying on its
>> version id
>> > > > >
>> > > > > class CheckpointedOffsets {
>> > > > >
>> > > > >     Integer getVersion();
>> > > > >     Long getFirstOffset();
>> > > > >     Long getSecondOffset();   // would return NO_AVAILABLE with v0
>> > > format
>> > > > > }
>> > > > >
>> > > > >
>> > > > > As I think of it, another benefit is that we wont have a partition
>> > that
>> > > > > only have one of the watermarks in case of a failure in between
>> > writing
>> > > > two
>> > > > > files.
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > > On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hey Guozhang,
>> > > > > >
>> > > > > > Thanks for the review:) Yes it is possible to combine them. Both
>> > > > solution
>> > > > > > will have the same performance. But I think the current solution
>> > will
>> > > > > give
>> > > > > > us simpler Java class design. Note that we will have to change
>> Java
>> > > API
>> > > > > > (e.g. read() and write()) of OffsetCheckpoint class in order to
>> > > > provide a
>> > > > > > map from TopicPartition to a pair of integers when we write to
>> > > > checkpoint
>> > > > > > file. This makes this class less generic since this API is not
>> used
>> > > by
>> > > > > log
>> > > > > > recovery checkpoint and log cleaner checkpoint which are also
>> using
>> > > > > > OffsetCheckpoint class.
>> > > > > >
>> > > > > > Dong
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Sat, Jan 21, 2017 at 12:28 PM, Guozhang Wang <
>> > wangg...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi Dong,
>> > > > > > >
>> > > > > > > Sorry for being late on reviewing this KIP. It LGTM overall,
>> but
>> > > I'm
>> > > > > > > wondering if we can save adding the
>> "replication-low-watermark-
>> > > > > > checkpoint"
>> > > > > > > file by just bumping up the version number of
>> > "replication-offset-
>> > > > > > > checkpoint"
>> > > > > > > to let it have two values for each partition, i.e.:
>> > > > > > >
>> > > > > > > 1  // version number
>> > > > > > > [number of partitions]
>> > > > > > > [topic name] [partition id] [lwm] [hwm]
>> > > > > > >
>> > > > > > >
>> > > > > > > This will affects the upgrade path a bit, but I think not by
>> > large,
>> > > > and
>> > > > > > all
>> > > > > > > other logic will not be affected.
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin <
>> lindon...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Thanks to everyone who voted and provided feedback!
>> > > > > > > >
>> > > > > > > > This KIP is now adopted with 3 binding +1s (Jun, Joel,
>> Becket)
>> > > and
>> > > > 2
>> > > > > > > > non-binding +1s (Radai, Mayuresh).
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Dong
>> > > > > > > >
>> > > > > > > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao <j...@confluent.io>
>> > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi, Dong,
>> > > > > > > > >
>> > > > > > > > > Thanks for the update. +1
>> > > > > > > > >
>> > > > > > > > > Jun
>> > > > > > > > >
>> > > > > > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin <
>> > lindon...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jun,
>> > > > > > > > > >
>> > > > > > > > > > After some more thinking, I agree with you that it is
>> > better
>> > > to
>> > > > > > > simply
>> > > > > > > > > > throw OffsetOutOfRangeException and not update
>> > low_watermark
>> > > if
>> > > > > > > > > > offsetToPurge is larger than high_watermark.
>> > > > > > > > > >
>> > > > > > > > > > My use-case of allowing low_watermark > high_watermark
>> in
>> > > 2(b)
>> > > > is
>> > > > > > to
>> > > > > > > > > allow
>> > > > > > > > > > user to purge all the data in the log even if that data
>> is
>> > > not
>> > > > > > fully
>> > > > > > > > > > replicated to followers. An offset higher than
>> > high_watermark
>> > > > may
>> > > > > > be
>> > > > > > > > > > returned to user either through producer's
>> RecordMetadata,
>> > or
>> > > > > > through
>> > > > > > > > > > ListOffsetResponse if from_consumer option is false.
>> > However,
>> > > > > this
>> > > > > > > may
>> > > > > > > > > > cause problem in case of unclean leader election or when
>> > > > consumer
>> > > > > > > seeks
>> > > > > > > > > to
>> > > > > > > > > > the largest offset of the partition. It will complicate
>> > this
>> > > > KIP
>> > > > > if
>> > > > > > > we
>> > > > > > > > > were
>> > > > > > > > > > to address these two problems.
>> > > > > > > > > >
>> > > > > > > > > > At this moment I prefer to keep this KIP simple by
>> > requiring
>> > > > > > > > > low_watermark
>> > > > > > > > > > <= high_watermark. The caveat is that if user does want
>> to
>> > > > purge
>> > > > > > > *all*
>> > > > > > > > > the
>> > > > > > > > > > data that is already produced, then he needs to stop all
>> > > > > producers
>> > > > > > > that
>> > > > > > > > > are
>> > > > > > > > > > producing into this topic, wait long enough for all
>> > followers
>> > > > to
>> > > > > > > catch
>> > > > > > > > > up,
>> > > > > > > > > > and then purge data using the latest offset of this
>> > > partition,
>> > > > > i.e.
>> > > > > > > > > > high_watermark. We can revisit this if some strong
>> use-case
>> > > > comes
>> > > > > > up
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > future.
>> > > > > > > > > >
>> > > > > > > > > > I also updated the KIP to allow user to use offset -1L
>> to
>> > > > > indicate
>> > > > > > > > > > high_watermark in the PurgeRequest. In the future we can
>> > > allow
>> > > > > > users
>> > > > > > > to
>> > > > > > > > > use
>> > > > > > > > > > offset -2L to indicate that they want to purge all data
>> up
>> > to
>> > > > > > > > > logEndOffset.
>> > > > > > > > > >
>> > > > > > > > > > Thanks!
>> > > > > > > > > > Dong
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao <
>> > j...@confluent.io>
>> > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi, Dong,
>> > > > > > > > > > >
>> > > > > > > > > > > For 2(b), it seems a bit weird to allow highWatermark
>> to
>> > be
>> > > > > > smaller
>> > > > > > > > > than
>> > > > > > > > > > > lowWatermark. Also, from the consumer's perspective,
>> > > messages
>> > > > > are
>> > > > > > > > > > available
>> > > > > > > > > > > only up to highWatermark. What if we simply throw
>> > > > > > > > > > OffsetOutOfRangeException
>> > > > > > > > > > > if offsetToPurge is larger than highWatermark?
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > >
>> > > > > > > > > > > Jun
>> > > > > > > > > > >
>> > > > > > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin <
>> > > > lindon...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Jun,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thank you. Please see my answers below. The KIP is
>> > > updated
>> > > > to
>> > > > > > > > answer
>> > > > > > > > > > > these
>> > > > > > > > > > > > questions (see here
>> > > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/
>> > > > > > > > diffpagesbyversion.action
>> > > > > > > > > ?
>> > > > > > > > > > > > pageId=67636826&selectedPageVersions=5&
>> > > > > selectedPageVersions=6>
>> > > > > > > > > > > > ).
>> > > > > > > > > > > >
>> > > > > > > > > > > > 1. Yes, in this KIP we wait for all replicas. This
>> is
>> > the
>> > > > > same
>> > > > > > as
>> > > > > > > > if
>> > > > > > > > > > > > producer sends a messge with ack=all and
>> > > isr=all_replicas.
>> > > > So
>> > > > > > it
>> > > > > > > > > seems
>> > > > > > > > > > > that
>> > > > > > > > > > > > the comparison is OK?
>> > > > > > > > > > > >
>> > > > > > > > > > > > 2. Good point! I haven't thought about the case
>> where
>> > the
>> > > > > > > > > > user-specified
>> > > > > > > > > > > > offset > logEndOffset. Please see answers below.
>> > > > > > > > > > > >
>> > > > > > > > > > > > a) If offsetToPurge < lowWatermark, the first
>> condition
>> > > > > > > > > > > > of DelayedOperationPurgatory will be satisfied
>> > > immediately
>> > > > > when
>> > > > > > > > > broker
>> > > > > > > > > > > > receives PurgeRequest. Broker will send
>> PurgeResponse
>> > to
>> > > > > admin
>> > > > > > > > client
>> > > > > > > > > > > > immediately. The response maps this partition to the
>> > > > > > > lowWatermark.
>> > > > > > > > > > > >
>> > > > > > > > > > > > This case is covered as the first condition of
>> > > > > > > > > > DelayedOperationPurgatory
>> > > > > > > > > > > in
>> > > > > > > > > > > > the current KIP.
>> > > > > > > > > > > >
>> > > > > > > > > > > > b) If highWatermark < offsetToPurge < logEndOffset,
>> > > leader
>> > > > > will
>> > > > > > > > send
>> > > > > > > > > > > > FetchResponse with low_watermark=offsetToPurge.
>> > Follower
>> > > > > > records
>> > > > > > > > the
>> > > > > > > > > > > > offsetToPurge as low_watermark and sends
>> FetchRequest
>> > to
>> > > > the
>> > > > > > > leader
>> > > > > > > > > > with
>> > > > > > > > > > > > the new low_watermark. Leader will then send
>> > > PurgeResponse
>> > > > to
>> > > > > > > admin
>> > > > > > > > > > > client
>> > > > > > > > > > > > which maps this partition to the new low_watermark.
>> The
>> > > > data
>> > > > > in
>> > > > > > > the
>> > > > > > > > > > range
>> > > > > > > > > > > > [highWatermark, offsetToPurge] will still be
>> appended
>> > > from
>> > > > > > leader
>> > > > > > > > to
>> > > > > > > > > > > > followers but will not be exposed to consumers. And
>> in
>> > a
>> > > > > short
>> > > > > > > > period
>> > > > > > > > > > of
>> > > > > > > > > > > > time low_watermark on the follower will be higher
>> than
>> > > > their
>> > > > > > > > > > > highWatermark.
>> > > > > > > > > > > >
>> > > > > > > > > > > > This case is also covered in the current KIP so no
>> > change
>> > > > is
>> > > > > > > > > required.
>> > > > > > > > > > > >
>> > > > > > > > > > > > c) If logEndOffset < offsetToPurge, leader will send
>> > > > > > > PurgeResponse
>> > > > > > > > to
>> > > > > > > > > > > admin
>> > > > > > > > > > > > client immediately. The response maps this
>> partition to
>> > > > > > > > > > > > OffsetOutOfRangeException.
>> > > > > > > > > > > >
>> > > > > > > > > > > > This case is not covered by the current KIP. I just
>> > added
>> > > > > this
>> > > > > > as
>> > > > > > > > the
>> > > > > > > > > > > > second condition for the PurgeRequest to be removed
>> > from
>> > > > > > > > > > > > DelayedOperationPurgatory (in the Proposed Change
>> > > section).
>> > > > > > Since
>> > > > > > > > the
>> > > > > > > > > > > > PurgeRequest is satisfied immediately when the
>> leader
>> > > > > receives
>> > > > > > > it,
>> > > > > > > > it
>> > > > > > > > > > > > actually won't be put into the
>> > DelayedOperationPurgatory.
>> > > > > > > > > > > >
>> > > > > > > > > > > > 3. Yes, lowWatermark will be used when
>> smallest_offset
>> > is
>> > > > > used
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > > ListOffsetRequest. I just updated Proposed Change
>> > section
>> > > > to
>> > > > > > > > specify
>> > > > > > > > > > > this.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Dong
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao <
>> > > j...@confluent.io
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi, Dong,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks for the KIP. Looks good overall. Just a few
>> > more
>> > > > > > > comments.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 1."Note that the way broker handles PurgeRequest
>> is
>> > > > similar
>> > > > > > to
>> > > > > > > > how
>> > > > > > > > > it
>> > > > > > > > > > > > > handles ProduceRequest with ack = -1 and
>> > > > isr=all_replicas".
>> > > > > > It
>> > > > > > > > > seems
>> > > > > > > > > > > that
>> > > > > > > > > > > > > the implementation is a bit different. In this
>> KIP,
>> > we
>> > > > wait
>> > > > > > for
>> > > > > > > > all
>> > > > > > > > > > > > > replicas. But in producer, acks=all means waiting
>> for
>> > > all
>> > > > > > > in-sync
>> > > > > > > > > > > > replicas.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 2. Could you describe the behavior when the
>> specified
>> > > > > > > > offsetToPurge
>> > > > > > > > > > is
>> > > > > > > > > > > > (a)
>> > > > > > > > > > > > > smaller than lowWatermark, (b) larger than
>> > > highWatermark,
>> > > > > but
>> > > > > > > > > smaller
>> > > > > > > > > > > > than
>> > > > > > > > > > > > > log end offset, (c) larger than log end offset?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark be
>> > > > returned
>> > > > > > when
>> > > > > > > > the
>> > > > > > > > > > > > > smallest_offset option is used?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Jun
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin <
>> > > > > > lindon...@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi all,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > It seems that there is no further concern with
>> the
>> > > > > KIP-107.
>> > > > > > > At
>> > > > > > > > > this
>> > > > > > > > > > > > point
>> > > > > > > > > > > > > > we would like to start the voting process. The
>> KIP
>> > > can
>> > > > be
>> > > > > > > found
>> > > > > > > > > at
>> > > > > > > > > > > > > > https://cwiki.apache.org/confl
>> > > uence/display/KAFKA/KIP-
>> > > > > 107
>> > > > > > > > > > > > > > %3A+Add+purgeDataBefore%28%29+
>> API+in+AdminClient.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > Dong
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>
>

Reply via email to