Hi Dong,

Thanks for the KIP.

I had a question (which might have been answered before).

1) The KIP says that the low_water_mark will be updated periodically by the
broker like high_water_mark.
Essentially we want to use low_water_mark for cases where an entire segment
cannot be deleted because may be the segment_start_offset < PurgeOffset <
segment_end_offset, in which case we will set the low_water_mark to
PurgeOffset+1.

2) The KIP also says that messages below low_water_mark will not be exposed
for consumers, which does make sense since we want say that data below
low_water_mark is purged.

Looking at above conditions, does it make sense not to update the
low_water_mark periodically but only on PurgeRequest?
The reason being, if we update it periodically then as per 2) we will not
be allowing consumers to re-consume data that is not purged but is below
low_water_mark.

Thanks,

Mayuresh


On Fri, Jan 6, 2017 at 11:18 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks for reviewing the KIP!
>
> 1. The low_watermark will be checkpointed in a new file named
>  "replication-low-watermark-checkpoint". It will have the same format as
> the existing replication-offset-checkpoint file. This allows us the keep
> the existing format of checkpoint files which maps TopicPartition to Long.
> I just updated the "Public Interface" section in the KIP wiki to explain
> this file.
>
> 2. I think using low_watermark from leader to trigger log retention in the
> follower will work correctly in the sense that all messages with offset <
> low_watermark can be deleted. But I am not sure that the efficiency is the
> same, i.e. offset of messages which should be deleted (i.e. due to time or
> size-based log retention policy) will be smaller than low_watermark from
> the leader.
>
> For example, say both the follower and the leader have messages with
> offsets in range [0, 2000]. If the follower does log rolling slightly later
> than leader, the segments on follower would be [0, 1001], [1002, 2000] and
> segments on leader would be [0, 1000], [1001, 2000]. After leader deletes
> the first segment, the low_watermark would be 1001. Thus the first segment
> would stay on follower's disk unnecessarily which may double disk usage at
> worst.
>
> Since this approach doesn't save us much, I am inclined to not include this
> change to keep the KIP simple.
>
> Dong
>
>
>
> On Fri, Jan 6, 2017 at 10:05 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Thanks for the proposal. Looks good overall. A couple of comments.
> >
> > 1. Where is the low_watermark checkpointed? Is that
> > in replication-offset-checkpoint? If so, do we need to bump up the
> version?
> > Could you also describe the format change?
> >
> > 2. For topics with "delete" retention, currently we let each replica
> delete
> > old segments independently. With low_watermark, we could just let leaders
> > delete old segments through the deletion policy and the followers will
> > simply delete old segments based on low_watermark. Not sure if this saves
> > much, but is a potential option that may be worth thinking about.
> >
> > Jun
> >
> >
> >
> > On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > one more example of complicated config - mirror maker.
> > >
> > > we definitely cant trust each and every topic owner to configure their
> > > topics not to purge before they've been mirrored.
> > > which would mean there's a per-topic config (set by the owner) and a
> > > "global" config (where mirror makers are specified) and they need to be
> > > "merged".
> > > for those topics that _are_ mirrored.
> > > which is a changing set of topics thats stored in an external system
> > > outside of kafka.
> > > if a topic is taken out of the mirror set the MM offset would be
> "frozen"
> > > at that point and prevent clean-up for all eternity, unless its
> > cleaned-up
> > > itself.
> > >
> > > ...
> > >
> > > complexity :-)
> > >
> > > On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com>
> > wrote:
> > >
> > > > in summary - i'm not opposed to the idea of a per-topic clean up
> config
> > > > that tracks some set of consumer groups' offsets (which would
> probably
> > > work
> > > > for 80% of use cases), but i definitely see a need to expose a simple
> > API
> > > > for the more advanced/obscure/custom use cases (the other 20%).
> > > >
> > > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > > >
> > > >> a major motivation for this KIP is cost savings.
> > > >>
> > > >> lots of internal systems at LI use kafka as an intermediate pipe,
> and
> > > set
> > > >> the topic retention period to a "safe enough" amount of time to be
> > able
> > > to
> > > >> recover from crashes/downtime and catch up to "now". this results
> in a
> > > few
> > > >> days' worth of retention typically.
> > > >>
> > > >> however, under normal operating conditions the consumers are mostly
> > > >> caught-up and so early clean-up enables a big cost savings in
> storage.
> > > >>
> > > >> as for my points:
> > > >>
> > > >> 1. when discussing implementation options for automatic clean-up we
> > > >> realized that cleaning up by keeping track of offsets stored in
> kafka
> > > >> requires some per-topic config - you need to specify which groups to
> > > track.
> > > >> this becomes a problem because:
> > > >>     1.1 - relatively complicated code, to be written in the broker.
> > > >>     1.2 - configuration needs to be maintained up to date by topic
> > > >> "owners" - of which we have thousands. failure to do so would
> decrease
> > > the
> > > >> cost benefit.
> > > >>     1.3 - some applications have a "reconsume" / "reinit" /
> > "bootstrap"
> > > >> workflow where they will reset their offsets to an earlier value
> than
> > > the
> > > >> one stored. this means that a stored offset of X does not always
> mean
> > > you
> > > >> can clean up to X-1. think of it as video encoding -some apps have
> > "key
> > > >> frames" they may seek back to which are before their current offset.
> > > >>     1.4 - there are multiple possible strategies - you could clean
> up
> > > >> aggressively, retain some "time distance" from latest, some "offset
> > > >> distance", etc. this we think would have made it very hard to agree
> > on a
> > > >> single "correct" implementation that everyone would be happy with.
> it
> > > would
> > > >> be better to include the raw functionality in the API and leave the
> > > >> "brains" to an external monitoring system where people could
> > > custom-taylor
> > > >> their logic
> > > >>
> > > >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> > > >> console consumers and connect to a topic as a debug aid. SREs may
> also
> > > do
> > > >> this. there are also various other eco-system applications that may
> > > >> consumer from topics (unknown to topic owners as those are infra
> > > monitoring
> > > >> tools). obviously such consumer-groups' offsets should be ignored
> for
> > > >> purposes of clean-up, but coming up with a bullet-proof way to do
> this
> > > is
> > > >> non-trivial and again ties with implementation complexity and
> > > inflexibility
> > > >> of a "one size fits all" solution in 1.4 above.
> > > >>
> > > >> 3. forceful clean-up: we have workflows that use kafka to move
> > gigantic
> > > >> blobs from offline hadoop processing flows into systems. the data
> > being
> > > >> "loaded" into such an online system can be several GBs in side and
> > take
> > > a
> > > >> long time to consume (they are sliced into many small msgs).
> sometimes
> > > the
> > > >> sender wants to abort and start a new blob before the current load
> > > process
> > > >> has completed - meaning the consumer's offsets are not yet caught
> up.
> > > >>
> > > >> 4. offsets outside of kafka: yes, you could force applications to
> > store
> > > >> their offsets twice, but thats inefficient. its better to expose a
> > raw,
> > > >> simple API and let such applications manage their own clean-up logic
> > > (this
> > > >> again ties into 1.4 and no "one size fits all" solution)
> > > >>
> > > >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >>
> > > >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> > > >>> e...@confluent.io>
> > > >>> wrote:
> > > >>>
> > > >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >>> >
> > > >>> > > Hey Ewen,
> > > >>> > >
> > > >>> > > Thanks for the review. As Radai explained, it would be complex
> in
> > > >>> terms
> > > >>> > of
> > > >>> > > user configuration if we were to use committed offset to decide
> > > data
> > > >>> > > deletion. We need a way to specify which groups need to consume
> > > data
> > > >>> of
> > > >>> > > this partition. The broker will also need to consume the entire
> > > >>> offsets
> > > >>> > > topic in that approach which has some overhead. I don't think
> it
> > is
> > > >>> that
> > > >>> > > hard to implement. But it will likely take more time to discuss
> > > that
> > > >>> > > approach due to the new config and the server side overhead.
> > > >>> > >
> > > >>> > > We choose to put this API in AdminClient because the API is
> more
> > > >>> like an
> > > >>> > > administrative operation (such as listGroups, deleteTopics)
> than
> > a
> > > >>> > consumer
> > > >>> > > operation. It is not necessarily called by consumer only. For
> > > >>> example, we
> > > >>> > > can implement the "delete data before committed offset"
> approach
> > by
> > > >>> > running
> > > >>> > > an external service which calls purgeDataBefore() API based on
> > > >>> committed
> > > >>> > > offset of consumer groups.
> > > >>> > >
> > > >>> > > I am not aware that AdminClient is not a public API. Suppose it
> > is
> > > >>> not
> > > >>> > > public now, I assume we plan to make it public in the future as
> > > part
> > > >>> of
> > > >>> > > KIP-4. Are we not making it public because its interface is not
> > > >>> stable?
> > > >>> > If
> > > >>> > > so, can we just tag this new API as not stable in the code?
> > > >>> > >
> > > >>> >
> > > >>> >
> > > >>> > The AdminClient planned for KIP-4 is a new Java-based
> > implementation.
> > > >>> It's
> > > >>> > definitely confusing that both will be (could be?) named
> > AdminClient,
> > > >>> but
> > > >>> > we've kept the existing Scala AdminClient out of the public API
> and
> > > >>> have
> > > >>> > not required KIPs for changes to it.
> > > >>> >
> > > >>> > That said, I agree that if this type of API makes it into Kafka,
> > > >>> having a
> > > >>> > (new, Java-based) AdminClient method would definitely be a good
> > idea.
> > > >>> An
> > > >>> > alternative path might be to have a Consumer-based implementation
> > > since
> > > >>> > that seems like a very intuitive, natural way to use the
> protocol.
> > I
> > > >>> think
> > > >>> > optimizing for the expected use case would be a good idea.
> > > >>> >
> > > >>> > -Ewen
> > > >>> >
> > > >>> > Are you saying that the Scala AdminClient is not a public API and
> > we
> > > >>> discourage addition of any new feature to this class?
> > > >>>
> > > >>> I still prefer to add it to AdminClient (Java version in the future
> > and
> > > >>> Scala version in the short team) because I feel it belongs to admin
> > > >>> operation instead of KafkaConsumer interface. For example, if in
> the
> > > >>> future
> > > >>> we implement the "delete data before committed offset" strategy in
> an
> > > >>> external service, I feel it is a bit awkward if the service has to
> > > >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(
> > > ...)
> > > >>> to
> > > >>> purge data. In other words, our expected use-case doesn't
> necessarily
> > > >>> bind
> > > >>> this API with consumer.
> > > >>>
> > > >>> I am not strong on this issue. Let's see what other
> > > committers/developers
> > > >>> think about this.
> > > >>>
> > > >>>
> > > >>> >
> > > >>> > >
> > > >>> > > Thanks,
> > > >>> > > Dong
> > > >>> > >
> > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> > > >>> e...@confluent.io
> > > >>> > >
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > Dong,
> > > >>> > > >
> > > >>> > > > Looks like that's an internal link,
> > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> > > >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> > > >>> > > > is the right one.
> > > >>> > > >
> > > >>> > > > I have a question about one of the rejected alternatives:
> > > >>> > > >
> > > >>> > > > > Using committed offset instead of an extra API to trigger
> > data
> > > >>> purge
> > > >>> > > > operation.
> > > >>> > > >
> > > >>> > > > The KIP says this would be more complicated to implement. Why
> > is
> > > >>> that?
> > > >>> > I
> > > >>> > > > think brokers would have to consume the entire offsets topic,
> > but
> > > >>> the
> > > >>> > > data
> > > >>> > > > stored in memory doesn't seem to change and applying this
> when
> > > >>> updated
> > > >>> > > > offsets are seen seems basically the same. It might also be
> > > >>> possible to
> > > >>> > > > make it work even with multiple consumer groups if that was
> > > desired
> > > >>> > > > (although that'd require tracking more data in memory) as a
> > > >>> > > generalization
> > > >>> > > > without requiring coordination between the consumer groups.
> > Given
> > > >>> the
> > > >>> > > > motivation, I'm assuming this was considered unnecessary
> since
> > > this
> > > >>> > > > specifically targets intermediate stream processing topics.
> > > >>> > > >
> > > >>> > > > Another question is why expose this via AdminClient (which
> > isn't
> > > >>> public
> > > >>> > > API
> > > >>> > > > afaik)? Why not, for example, expose it on the Consumer,
> which
> > is
> > > >>> > > > presumably where you'd want access to it since the
> > functionality
> > > >>> > depends
> > > >>> > > on
> > > >>> > > > the consumer actually having consumed the data?
> > > >>> > > >
> > > >>> > > > -Ewen
> > > >>> > > >
> > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <
> lindon...@gmail.com>
> > > >>> wrote:
> > > >>> > > >
> > > >>> > > > > Hi all,
> > > >>> > > > >
> > > >>> > > > > We created KIP-107 to propose addition of purgeDataBefore()
> > API
> > > >>> in
> > > >>> > > > > AdminClient.
> > > >>> > > > >
> > > >>> > > > > Please find the KIP wiki in the link
> > > https://iwww.corp.linkedin.
> > > >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> > > >>> > > > design+proposal.
> > > >>> > > > > We
> > > >>> > > > > would love to hear your comments and suggestions.
> > > >>> > > > >
> > > >>> > > > > Thanks,
> > > >>> > > > > Dong
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to