Hi Jason,

That actually sounds like a pretty good idea to me. No doubt if we use this
approach, then some comments need to be added that indicates this.
But all things considered, I think its not bad at all.

I definitely agree with you on that its a little hacky, but it works.

Cheers,
Richard

On Tue, Sep 24, 2019 at 10:44 AM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Richard,
>
> It would be unsatisfying to make a big change to the checkpointing logic in
> order to handle only one case of this problem, right?
>
> I did have one idea about how to do this. It's a bit of a hack, but keep an
> open mind ;). The basic problem is having somewhere to embed the delete
> horizon for each batch. In the v2 format, each batch header contains two
> timestamps: the base timestamp and the max timestamp. Each record in the
> batch contains a timestamp delta which is relative to the base timestamp.
> In other words, to get the record timestamp, you add the record delta to
> the base timestamp.
>
> Typically there is no reason for the base timestamp to be different from
> the timestamp of the first message, but this is not a strict requirement.
> As long as you can get to the record timestamp by adding the base timestamp
> and delta, then we are good. So the idea is to set the base timestamp to
> the delete horizon and adjust the deltas accordingly. We could then use one
> bit from the batch attributes to indicate when the base timestamp had been
> set to the delete horizon. There would be no change to the batch max
> timestamp, so indexing would not be affected by this change.
>
> So the logic would look something like this when cleaning the log.
>
> Case 1: Normal batch
>
> a. If delete horizon flag is set, then retain tombstones as long as the
> current time is before the horizon.
> b. If no delete horizon is set, then retain tombstones and set the delete
> horizon in the cleaned batch to current time +
> log.cleaner.delete.retention.ms.
>
> Case 2: Control batch
>
> a. If delete horizon flag is set, then retain the batch and the marker
> as long as the current time is before the horizon.
> b. If no delete horizon is set and there are no records remaining from the
> transaction, then retain the marker and set the delete horizon in the
> cleaned batch to current time + log.cleaner.delete.retention.ms.
>
> What do you think?
>
> -Jason
>
>
>
> On Thu, Sep 19, 2019 at 3:21 PM Richard Yu <yohan.richard...@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > That hadn't occurred to me.
> >
> > I think I missed your comment in the discussion, so I created this KIP
> only
> > with resolving the problem regarding tombstones.
> > Whats your thoughts? If the problem regarding transaction markers is a
> > little too complex, then we can we just leave it out of the KIP and fix
> the
> > tombstones issue.
> >
> > Cheers,
> > Richard
> >
> > On Thu, Sep 19, 2019 at 8:47 AM Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Richard,
> > >
> > > Just reposting my comment from the JIRA:
> > >
> > > The underlying problem here also impacts the cleaning of transaction
> > > markers. We use the same delete horizon in order to tell when it is
> safe
> > to
> > > remove the marker. If all the data from a transaction has been cleaned
> > and
> > > the delete horizon has passed enough time, then the marker is eligible
> > for
> > > deletion.
> > >
> > > However, I don't think the same approach that we're proposing to fix
> the
> > > problem for tombstones will work transaction markers. What we need to
> > track
> > > is the timestamp when all the records from a transaction have been
> > removed.
> > > That is when we start the timer for deletion. But this would be
> different
> > > for every transaction and there is no guarantee that earlier
> transactions
> > > will be eligible for deletion before later ones. It all depends on the
> > keys
> > > written in the transaction. I don't see an obvious way to solve this
> > > problem without some record-level bookkeeping, but I might be missing
> > > something.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Mon, Sep 9, 2019 at 7:21 PM Richard Yu <yohan.richard...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for chipping in. :)
> > > >
> > > > The description you provided is pretty apt in describing the
> motivation
> > > of
> > > > the KIP, so I will add it. I've made some changes to the KIP and
> > outlined
> > > > the basic approaches of what we have so far (basically changing the
> > > > checkpoint file organization or incorporating an extra internal
> header
> > > > field for a record). I will expand on them shortly.
> > > >
> > > > Any comments are appreciated!
> > > >
> > > > Cheers,
> > > > Richard
> > > >
> > > > On Mon, Sep 9, 2019 at 3:10 PM Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Richard,
> > > > >
> > > > > Thanks for drafting the KIP. A few comments below.
> > > > >
> > > > > 1. We need to provide a better motivation for the KIP. The goal of
> > the
> > > > KIP
> > > > > is not to reorganize the checkpoint for log cleaning. It's just an
> > > > > implementation detail. I was thinking that we could add sth like
> the
> > > > > following in the Motivation/Problem section.
> > > > >
> > > > > "The idea of the configuration delete.retention.ms for compacted
> > > topics
> > > > is
> > > > > to prevent an application that has read a key to not see a
> subsequent
> > > > > deletion of the key because it's physically removed too early. To
> > solve
> > > > > this problem, from the latest possible time (deleteHorizonMs) that
> an
> > > > > application could have read a non tombstone key before a tombstone,
> > we
> > > > > preserve that tombstone for at least delete.retention.ms and
> require
> > > the
> > > > > application to complete the reading of the tombstone by then.
> > > > >
> > > > > deleteHorizonMs is no later than the time when the cleaner has
> > cleaned
> > > up
> > > > > to the tombstone. After that time, no application can read a
> > > > non-tombstone
> > > > > key before the tombstone because they have all been cleaned away
> > > through
> > > > > compaction. Since currently we don't explicitly store the time
> when a
> > > > round
> > > > > of cleaning completes, deleteHorizonMs is estimated by the last
> > > modified
> > > > > time of the segment containing firstDirtyOffset. When merging
> > multiple
> > > > log
> > > > > segments into a single one, the last modified time is inherited
> from
> > > the
> > > > > last merged segment. So the last modified time of the newly merged
> > > > segment
> > > > > is actually not an accurate estimate of deleteHorizonMs. It could
> be
> > > > > arbitrarily before (KAFKA-4545 <
> > https://issues.apache.org/jira/browse/
> > > >)
> > > > > or
> > > > > after (KAFKA-8522 <
> https://issues.apache.org/jira/browse/KAFKA-8522
> > >).
> > > > The
> > > > > former causes the tombstone to be deleted too early, which can
> cause
> > an
> > > > > application to miss the deletion of a key. The latter causes the
> > > > tombstone
> > > > > to be retained longer than needed and potentially forever."
> > > > >
> > > > > We probably want to change the title of the KIP accordingly.
> > > > >
> > > > > 2. The proposed implementation of the KIP is to remember the
> > > > > firstDirtyOffset offset and the corresponding cleaning time in a
> > > > checkpoint
> > > > > file per partition and then use them to estimate deleteHorizonMs.
> It
> > > > would
> > > > > be useful to document the format of the new checkpoint file and how
> > it
> > > > will
> > > > > be used during cleaning. Some examples will be helpful.
> > > > >
> > > > > 3. Thinking about this more. There is another way to solve this
> > > problem.
> > > > We
> > > > > could write the deleteHorizonMs for each tombstone as an internal
> > > header
> > > > > field of the record (e.g., __deleteHorizonMs). That timestamp could
> > be
> > > > the
> > > > > starting time of the log cleaner when the tombstone's offset is <=
> > > > > firstDirtyOffset. We could use this timestamp to determine whether
> > the
> > > > > tombstone should be removed in subsequent rounds of cleaning. This
> > way,
> > > > we
> > > > > can still keep the current per disk checkpoint file, which is more
> > > > > efficient. Personally, I think this approach may be better. Could
> you
> > > > > document this approach in the wiki as well so that we can discuss
> > which
> > > > one
> > > > > to pick?
> > > > >
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Sun, Sep 1, 2019 at 7:45 PM Richard Yu <
> > yohan.richard...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > A KIP has been written that wishes to upgrade the checkpoint file
> > > > system
> > > > > in
> > > > > > log cleaner.
> > > > > > If anybody wishes to comment, feel free to do so. :)
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Reorganize+checkpoint+file+system+in+log+cleaner+to+be+per+partition
> > > > > > Above is the link for reference.
> > > > > >
> > > > > > Cheers,
> > > > > > Richard
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to