Hey Kartik,

Yes, I agree exactly with your characterization.

The question is "what is the meaning of retention?" It could mean either:
1. "Retain data that is no more than 7 days old"
2. "Retain data for 7 days from when you get it"

I don't know if either is actually a clear winner.

Each is intuitive and easily expressible:
1. "We have the last 7 days of data in Kafka"
2. "You have 7 days to get your data from Kafka from whenever it arrives"

Each has corner cases:
1. May lead to retaining data for too little time in the bootstrap case
2. May lead to over-retention in the bootstrap case

Which failure is worse probably depends on who you ask:
1. May lead to not retaining data long enough for a consumer to get it,
which the consumer would say is very bad
2. May lead to running out of disk space, which ops would say is very bad

-Jay




On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam <
kparamasi...@linkedin.com.invalid> wrote:

> Joel or Becket will probably respond back in more detail.. but here are my
> 2c.
>
> From the standpoint of LinkedIN, the suggested proposal works.. in essence
> max.appenddelay can be used to turn "creationTime" into "logAppendTime".
>    this does mean that at LinkedIn we won't be able to use "creationTime"..
> however that might also be fine because we anyways use the timeStamp that
> is set inside the avro payload.
>
> Keeping LI aside though, it looks like there are two distinct possible
> goals.
> 1. The broker will retain messages for x days after a message shows up at
> the broker.   This behavior would super deterministic and would never
> change depending on the contents of the message or anything else.
>
> 2. The client is in "partial" control of how long a message stays in the
> broker based on the creationTime stamped by the client.
>
> Although (2) could be a feature in some scenarios..but in many scenarios it
> can be pretty unintuitive and be perceived as an anti-feature.  For e.g say
> a mobile client buffered up some messages because the device was offline
> (maybe in a plane).. and then sent the message after say 23 hours on a
> plane.  The message shows up in a Kafka topic with 24 hour retention.. and
> now the message gets deleted in 1 hour.
>
> Kartik
>
>
> On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Here's my basic take:
> > - I agree it would be nice to have a notion of time baked in if it were
> > done right
> > - All the proposals so far seem pretty complex--I think they might make
> > things worse rather than better overall
> > - I think adding 2x8 byte timestamps to the message is probably a
> > non-starter from a size perspective
> > - Even if it isn't in the message, having two notions of time that
> control
> > different things is a bit confusing
> > - The mechanics of basing retention etc on log append time when that's
> not
> > in the log seem complicated
> >
> > To that end here is a possible 4th option. Let me know what you think.
> >
> > The basic idea is that the message creation time is closest to what the
> > user actually cares about but is dangerous if set wrong. So rather than
> > substitute another notion of time, let's try to ensure the correctness of
> > message creation time by preventing arbitrarily bad message creation
> times.
> >
> > First, let's see if we can agree that log append time is not something
> > anyone really cares about but rather an implementation detail. The
> > timestamp that matters to the user is when the message occurred (the
> > creation time). The log append time is basically just an approximation to
> > this on the assumption that the message creation and the message receive
> on
> > the server occur pretty close together and the reason to prefer .
> >
> > But as these values diverge the issue starts to become apparent. Say you
> > set the retention to one week and then mirror data from a topic
> containing
> > two years of retention. Your intention is clearly to keep the last week,
> > but because the mirroring is appending right now you will keep two years.
> >
> > The reason we are liking log append time is because we are (justifiably)
> > concerned that in certain situations the creation time may not be
> > trustworthy. This same problem exists on the servers but there are fewer
> > servers and they just run the kafka code so it is less of an issue.
> >
> > There are two possible ways to handle this:
> >
> >    1. Just tell people to add size based retention. I think this is not
> >    entirely unreasonable, we're basically saying we retain data based on
> > the
> >    timestamp you give us in the data. If you give us bad data we will
> > retain
> >    it for a bad amount of time. If you want to ensure we don't retain
> "too
> >    much" data, define "too much" by setting a time-based retention
> setting.
> >    This is not entirely unreasonable but kind of suffers from a "one bad
> >    apple" problem in a very large environment.
> >    2. Prevent bad timestamps. In general we can't say a timestamp is bad.
> >    However the definition we're implicitly using is that we think there
> > are a
> >    set of topics/clusters where the creation timestamp should always be
> > "very
> >    close" to the log append timestamp. This is true for data sources that
> > have
> >    no buffering capability (which at LinkedIn is very common, but is more
> > rare
> >    elsewhere). The solution in this case would be to allow a setting
> along
> > the
> >    lines of max.append.delay which checks the creation timestamp against
> > the
> >    server time to look for too large a divergence. The solution would
> > either
> >    be to reject the message or to override it with the server time.
> >
> > So in LI's environment you would configure the clusters used for direct,
> > unbuffered, message production (e.g. tracking and metrics local) to
> enforce
> > a reasonably aggressive timestamp bound (say 10 mins), and all other
> > clusters would just inherent these.
> >
> > The downside of this approach is requiring the special configuration.
> > However I think in 99% of environments this could be skipped entirely,
> it's
> > only when the ratio of clients to servers gets so massive that you need
> to
> > do this. The primary upside is that you have a single authoritative
> notion
> > of time which is closest to what a user would want and is stored directly
> > in the message.
> >
> > I'm also assuming there is a workable approach for indexing non-monotonic
> > timestamps, though I haven't actually worked that out.
> >
> > -Jay
> >
> > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> > wrote:
> >
> > > Bumping up this thread although most of the discussion were on the
> > > discussion thread of KIP-31 :)
> > >
> > > I just updated the KIP page to add detailed solution for the option
> > (Option
> > > 3) that does not expose the LogAppendTime to user.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
> > >
> > > The option has a minor change to the fetch request to allow fetching
> time
> > > index entry as well. I kind of like this solution because its just
> doing
> > > what we need without introducing other things.
> > >
> > > It will be great to see what are the feedback. I can explain more
> during
> > > tomorrow's KIP hangout.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com>
> wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > I just copy/pastes here your feedback on the timestamp proposal that
> > was
> > > > in the discussion thread of KIP-31. Please see the replies inline.
> > > > The main change I made compared with previous proposal is to add both
> > > > CreateTime and LogAppendTime to the message.
> > > >
> > > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > Hey Beckett,
> > > > >
> > > > > I was proposing splitting up the KIP just for simplicity of
> > discussion.
> > > > You
> > > > > can still implement them in one patch. I think otherwise it will be
> > > hard
> > > > to
> > > > > discuss/vote on them since if you like the offset proposal but not
> > the
> > > > time
> > > > > proposal what do you do?
> > > > >
> > > > > Introducing a second notion of time into Kafka is a pretty massive
> > > > > philosophical change so it kind of warrants it's own KIP I think it
> > > > isn't
> > > > > just "Change message format".
> > > > >
> > > > > WRT time I think one thing to clarify in the proposal is how MM
> will
> > > have
> > > > > access to set the timestamp? Presumably this will be a new field in
> > > > > ProducerRecord, right? If so then any user can set the timestamp,
> > > right?
> > > > > I'm not sure you answered the questions around how this will work
> for
> > > MM
> > > > > since when MM retains timestamps from multiple partitions they will
> > > then
> > > > be
> > > > > out of order and in the past (so the max(lastAppendedTimestamp,
> > > > > currentTimeMillis) override you proposed will not work, right?). If
> > we
> > > > > don't do this then when you set up mirroring the data will all be
> new
> > > and
> > > > > you have the same retention problem you described. Maybe I missed
> > > > > something...?
> > > > lastAppendedTimestamp means the timestamp of the message that last
> > > > appended to the log.
> > > > If a broker is a leader, since it will assign the timestamp by
> itself,
> > > the
> > > > lastAppenedTimestamp will be its local clock when append the last
> > > message.
> > > > So if there is no leader migration, max(lastAppendedTimestamp,
> > > > currentTimeMillis) = currentTimeMillis.
> > > > If a broker is a follower, because it will keep the leader's
> timestamp
> > > > unchanged, the lastAppendedTime would be the leader's clock when it
> > > appends
> > > > that message message. It keeps track of the lastAppendedTime only in
> > case
> > > > it becomes leader later on. At that point, it is possible that the
> > > > timestamp of the last appended message was stamped by old leader, but
> > the
> > > > new leader's currentTimeMillis < lastAppendedTime. If a new message
> > > comes,
> > > > instead of stamp it with new leader's currentTimeMillis, we have to
> > stamp
> > > > it to lastAppendedTime to avoid the timestamp in the log going
> > backward.
> > > > The max(lastAppendedTimestamp, currentTimeMillis) is purely based on
> > the
> > > > broker side clock. If MM produces message with different
> LogAppendTime
> > in
> > > > source clusters to the same target cluster, the LogAppendTime will be
> > > > ignored  re-stamped by target cluster.
> > > > I added a use case example for mirror maker in KIP-32. Also there is
> a
> > > > corner case discussion about when we need the max(lastAppendedTime,
> > > > currentTimeMillis) trick. Could you take a look and see if that
> answers
> > > > your question?
> > > >
> > > > >
> > > > > My main motivation is that given that both Samza and Kafka streams
> > are
> > > > > doing work that implies a mandatory client-defined notion of time,
> I
> > > > really
> > > > > think introducing a different mandatory notion of time in Kafka is
> > > going
> > > > to
> > > > > be quite odd. We should think hard about how client-defined time
> > could
> > > > > work. I'm not sure if it can, but I'm also not sure that it can't.
> > > Having
> > > > > both will be odd. Did you chat about this with Yi/Kartik on the
> Samza
> > > > side?
> > > > I talked with Kartik and realized that it would be useful to have a
> > > client
> > > > timestamp to facilitate use cases like stream processing.
> > > > I was trying to figure out if we can simply use client timestamp
> > without
> > > > introducing the server time. There are some discussion in the KIP.
> > > > The key problem we want to solve here is
> > > > 1. We want log retention and rolling to depend on server clock.
> > > > 2. We want to make sure the log-assiciated timestamp to be retained
> > when
> > > > replicas moves.
> > > > 3. We want to use the timestamp in some way that can allow searching
> by
> > > > timestamp.
> > > > For 1 and 2, an alternative is to pass the log-associated timestamp
> > > > through replication, that means we need to have a different protocol
> > for
> > > > replica fetching to pass log-associated timestamp. It is actually
> > > > complicated and there could be a lot of corner cases to handle. e.g.
> > what
> > > > if an old leader started to fetch from the new leader, should it also
> > > > update all of its old log segment timestamp?
> > > > I think actually client side timestamp would be better for 3 if we
> can
> > > > find a way to make it work.
> > > > So far I am not able to convince myself that only having client side
> > > > timestamp would work mainly because 1 and 2. There are a few
> > situations I
> > > > mentioned in the KIP.
> > > > >
> > > > > When you are saying it won't work you are assuming some particular
> > > > > implementation? Maybe that the index is a monotonically increasing
> > set
> > > of
> > > > > pointers to the least record with a timestamp larger than the index
> > > time?
> > > > > In other words a search for time X gives the largest offset at
> which
> > > all
> > > > > records are <= X?
> > > > It is a promising idea. We probably can have an in-memory index like
> > > that,
> > > > but might be complicated to have a file on disk like that. Imagine
> > there
> > > > are two timestamps T0 < T1. We see message Y created at T1 and
> created
> > > > index like [T1->Y], then we see message created at T1, supposedly we
> > > should
> > > > have index look like [T0->X, T1->Y], it is easy to do in memory, but
> we
> > > > might have to rewrite the index file completely. Maybe we can have
> the
> > > > first entry with timestamp to 0, and only update the first pointer
> for
> > > any
> > > > out of range timestamp, so the index will be [0->X, T1->Y]. Also, the
> > > range
> > > > of timestamps in the log segments can overlap with each other. That
> > means
> > > > we either need to keep a cross segments index file or we need to
> check
> > > all
> > > > the index file for each log segment.
> > > > I separated out the time based log index to KIP-33 because it can be
> an
> > > > independent follow up feature as Neha suggested. I will try to make
> the
> > > > time based index work with client side timestamp.
> > > > >
> > > > > For retention, I agree with the problem you point out, but I think
> > what
> > > > you
> > > > > are saying in that case is that you want a size limit too. If you
> use
> > > > > system time you actually hit the same problem: say you do a full
> dump
> > > of
> > > > a
> > > > > DB table with a setting of 7 days retention, your retention will
> > > actually
> > > > > not get enforced for the first 7 days because the data is "new to
> > > Kafka".
> > > > I kind of think the size limit here is orthogonal. It is a valid use
> > case
> > > > where people only want to use time based retention only. In your
> > example,
> > > > depending on client timestamp might break the functionality - say it
> > is a
> > > > bootstrap case people actually need to read all the data. If we
> depend
> > on
> > > > the client timestamp, the data might be deleted instantly when they
> > come
> > > to
> > > > the broker. It might be too demanding to expect the broker to
> > understand
> > > > what people actually want to do with the data coming in. So the
> > guarantee
> > > > of using server side timestamp is that "after appended to the log,
> all
> > > > messages will be available on broker for retention time", which is
> not
> > > > changeable by clients.
> > > > >
> > > > > -Jay
> > > >
> > > > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com>
> > > wrote:
> > > >
> > > >> Hi folks,
> > > >>
> > > >> This proposal was previously in KIP-31 and we separated it to KIP-32
> > per
> > > >> Neha and Jay's suggestion.
> > > >>
> > > >> The proposal is to add the following two timestamps to Kafka
> message.
> > > >> - CreateTime
> > > >> - LogAppendTime
> > > >>
> > > >> The CreateTime will be set by the producer and will change after
> that.
> > > >> The LogAppendTime will be set by broker for purpose such as enforce
> > log
> > > >> retention and log rolling.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to