Here's my basic take:
- I agree it would be nice to have a notion of time baked in if it were
done right
- All the proposals so far seem pretty complex--I think they might make
things worse rather than better overall
- I think adding 2x8 byte timestamps to the message is probably a
non-starter from a size perspective
- Even if it isn't in the message, having two notions of time that control
different things is a bit confusing
- The mechanics of basing retention etc on log append time when that's not
in the log seem complicated

To that end here is a possible 4th option. Let me know what you think.

The basic idea is that the message creation time is closest to what the
user actually cares about but is dangerous if set wrong. So rather than
substitute another notion of time, let's try to ensure the correctness of
message creation time by preventing arbitrarily bad message creation times.

First, let's see if we can agree that log append time is not something
anyone really cares about but rather an implementation detail. The
timestamp that matters to the user is when the message occurred (the
creation time). The log append time is basically just an approximation to
this on the assumption that the message creation and the message receive on
the server occur pretty close together and the reason to prefer .

But as these values diverge the issue starts to become apparent. Say you
set the retention to one week and then mirror data from a topic containing
two years of retention. Your intention is clearly to keep the last week,
but because the mirroring is appending right now you will keep two years.

The reason we are liking log append time is because we are (justifiably)
concerned that in certain situations the creation time may not be
trustworthy. This same problem exists on the servers but there are fewer
servers and they just run the kafka code so it is less of an issue.

There are two possible ways to handle this:

   1. Just tell people to add size based retention. I think this is not
   entirely unreasonable, we're basically saying we retain data based on the
   timestamp you give us in the data. If you give us bad data we will retain
   it for a bad amount of time. If you want to ensure we don't retain "too
   much" data, define "too much" by setting a time-based retention setting.
   This is not entirely unreasonable but kind of suffers from a "one bad
   apple" problem in a very large environment.
   2. Prevent bad timestamps. In general we can't say a timestamp is bad.
   However the definition we're implicitly using is that we think there are a
   set of topics/clusters where the creation timestamp should always be "very
   close" to the log append timestamp. This is true for data sources that have
   no buffering capability (which at LinkedIn is very common, but is more rare
   elsewhere). The solution in this case would be to allow a setting along the
   lines of max.append.delay which checks the creation timestamp against the
   server time to look for too large a divergence. The solution would either
   be to reject the message or to override it with the server time.

So in LI's environment you would configure the clusters used for direct,
unbuffered, message production (e.g. tracking and metrics local) to enforce
a reasonably aggressive timestamp bound (say 10 mins), and all other
clusters would just inherent these.

The downside of this approach is requiring the special configuration.
However I think in 99% of environments this could be skipped entirely, it's
only when the ratio of clients to servers gets so massive that you need to
do this. The primary upside is that you have a single authoritative notion
of time which is closest to what a user would want and is stored directly
in the message.

I'm also assuming there is a workable approach for indexing non-monotonic
timestamps, though I haven't actually worked that out.

-Jay

On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Bumping up this thread although most of the discussion were on the
> discussion thread of KIP-31 :)
>
> I just updated the KIP page to add detailed solution for the option (Option
> 3) that does not expose the LogAppendTime to user.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
>
> The option has a minor change to the fetch request to allow fetching time
> index entry as well. I kind of like this solution because its just doing
> what we need without introducing other things.
>
> It will be great to see what are the feedback. I can explain more during
> tomorrow's KIP hangout.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>
> > Hi Jay,
> >
> > I just copy/pastes here your feedback on the timestamp proposal that was
> > in the discussion thread of KIP-31. Please see the replies inline.
> > The main change I made compared with previous proposal is to add both
> > CreateTime and LogAppendTime to the message.
> >
> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Hey Beckett,
> > >
> > > I was proposing splitting up the KIP just for simplicity of discussion.
> > You
> > > can still implement them in one patch. I think otherwise it will be
> hard
> > to
> > > discuss/vote on them since if you like the offset proposal but not the
> > time
> > > proposal what do you do?
> > >
> > > Introducing a second notion of time into Kafka is a pretty massive
> > > philosophical change so it kind of warrants it's own KIP I think it
> > isn't
> > > just "Change message format".
> > >
> > > WRT time I think one thing to clarify in the proposal is how MM will
> have
> > > access to set the timestamp? Presumably this will be a new field in
> > > ProducerRecord, right? If so then any user can set the timestamp,
> right?
> > > I'm not sure you answered the questions around how this will work for
> MM
> > > since when MM retains timestamps from multiple partitions they will
> then
> > be
> > > out of order and in the past (so the max(lastAppendedTimestamp,
> > > currentTimeMillis) override you proposed will not work, right?). If we
> > > don't do this then when you set up mirroring the data will all be new
> and
> > > you have the same retention problem you described. Maybe I missed
> > > something...?
> > lastAppendedTimestamp means the timestamp of the message that last
> > appended to the log.
> > If a broker is a leader, since it will assign the timestamp by itself,
> the
> > lastAppenedTimestamp will be its local clock when append the last
> message.
> > So if there is no leader migration, max(lastAppendedTimestamp,
> > currentTimeMillis) = currentTimeMillis.
> > If a broker is a follower, because it will keep the leader's timestamp
> > unchanged, the lastAppendedTime would be the leader's clock when it
> appends
> > that message message. It keeps track of the lastAppendedTime only in case
> > it becomes leader later on. At that point, it is possible that the
> > timestamp of the last appended message was stamped by old leader, but the
> > new leader's currentTimeMillis < lastAppendedTime. If a new message
> comes,
> > instead of stamp it with new leader's currentTimeMillis, we have to stamp
> > it to lastAppendedTime to avoid the timestamp in the log going backward.
> > The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the
> > broker side clock. If MM produces message with different LogAppendTime in
> > source clusters to the same target cluster, the LogAppendTime will be
> > ignored  re-stamped by target cluster.
> > I added a use case example for mirror maker in KIP-32. Also there is a
> > corner case discussion about when we need the max(lastAppendedTime,
> > currentTimeMillis) trick. Could you take a look and see if that answers
> > your question?
> >
> > >
> > > My main motivation is that given that both Samza and Kafka streams are
> > > doing work that implies a mandatory client-defined notion of time, I
> > really
> > > think introducing a different mandatory notion of time in Kafka is
> going
> > to
> > > be quite odd. We should think hard about how client-defined time could
> > > work. I'm not sure if it can, but I'm also not sure that it can't.
> Having
> > > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> > side?
> > I talked with Kartik and realized that it would be useful to have a
> client
> > timestamp to facilitate use cases like stream processing.
> > I was trying to figure out if we can simply use client timestamp without
> > introducing the server time. There are some discussion in the KIP.
> > The key problem we want to solve here is
> > 1. We want log retention and rolling to depend on server clock.
> > 2. We want to make sure the log-assiciated timestamp to be retained when
> > replicas moves.
> > 3. We want to use the timestamp in some way that can allow searching by
> > timestamp.
> > For 1 and 2, an alternative is to pass the log-associated timestamp
> > through replication, that means we need to have a different protocol for
> > replica fetching to pass log-associated timestamp. It is actually
> > complicated and there could be a lot of corner cases to handle. e.g. what
> > if an old leader started to fetch from the new leader, should it also
> > update all of its old log segment timestamp?
> > I think actually client side timestamp would be better for 3 if we can
> > find a way to make it work.
> > So far I am not able to convince myself that only having client side
> > timestamp would work mainly because 1 and 2. There are a few situations I
> > mentioned in the KIP.
> > >
> > > When you are saying it won't work you are assuming some particular
> > > implementation? Maybe that the index is a monotonically increasing set
> of
> > > pointers to the least record with a timestamp larger than the index
> time?
> > > In other words a search for time X gives the largest offset at which
> all
> > > records are <= X?
> > It is a promising idea. We probably can have an in-memory index like
> that,
> > but might be complicated to have a file on disk like that. Imagine there
> > are two timestamps T0 < T1. We see message Y created at T1 and created
> > index like [T1->Y], then we see message created at T1, supposedly we
> should
> > have index look like [T0->X, T1->Y], it is easy to do in memory, but we
> > might have to rewrite the index file completely. Maybe we can have the
> > first entry with timestamp to 0, and only update the first pointer for
> any
> > out of range timestamp, so the index will be [0->X, T1->Y]. Also, the
> range
> > of timestamps in the log segments can overlap with each other. That means
> > we either need to keep a cross segments index file or we need to check
> all
> > the index file for each log segment.
> > I separated out the time based log index to KIP-33 because it can be an
> > independent follow up feature as Neha suggested. I will try to make the
> > time based index work with client side timestamp.
> > >
> > > For retention, I agree with the problem you point out, but I think what
> > you
> > > are saying in that case is that you want a size limit too. If you use
> > > system time you actually hit the same problem: say you do a full dump
> of
> > a
> > > DB table with a setting of 7 days retention, your retention will
> actually
> > > not get enforced for the first 7 days because the data is "new to
> Kafka".
> > I kind of think the size limit here is orthogonal. It is a valid use case
> > where people only want to use time based retention only. In your example,
> > depending on client timestamp might break the functionality - say it is a
> > bootstrap case people actually need to read all the data. If we depend on
> > the client timestamp, the data might be deleted instantly when they come
> to
> > the broker. It might be too demanding to expect the broker to understand
> > what people actually want to do with the data coming in. So the guarantee
> > of using server side timestamp is that "after appended to the log, all
> > messages will be available on broker for retention time", which is not
> > changeable by clients.
> > >
> > > -Jay
> >
> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com>
> wrote:
> >
> >> Hi folks,
> >>
> >> This proposal was previously in KIP-31 and we separated it to KIP-32 per
> >> Neha and Jay's suggestion.
> >>
> >> The proposal is to add the following two timestamps to Kafka message.
> >> - CreateTime
> >> - LogAppendTime
> >>
> >> The CreateTime will be set by the producer and will change after that.
> >> The LogAppendTime will be set by broker for purpose such as enforce log
> >> retention and log rolling.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >
>

Reply via email to