Hi Jay,

I just copy/pastes here your feedback on the timestamp proposal that was in
the discussion thread of KIP-31. Please see the replies inline.
The main change I made compared with previous proposal is to add both
CreateTime and LogAppendTime to the message.

On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Beckett,
>
> I was proposing splitting up the KIP just for simplicity of discussion.
You
> can still implement them in one patch. I think otherwise it will be hard
to
> discuss/vote on them since if you like the offset proposal but not the
time
> proposal what do you do?
>
> Introducing a second notion of time into Kafka is a pretty massive
> philosophical change so it kind of warrants it's own KIP I think it isn't
> just "Change message format".
>
> WRT time I think one thing to clarify in the proposal is how MM will have
> access to set the timestamp? Presumably this will be a new field in
> ProducerRecord, right? If so then any user can set the timestamp, right?
> I'm not sure you answered the questions around how this will work for MM
> since when MM retains timestamps from multiple partitions they will then
be
> out of order and in the past (so the max(lastAppendedTimestamp,
> currentTimeMillis) override you proposed will not work, right?). If we
> don't do this then when you set up mirroring the data will all be new and
> you have the same retention problem you described. Maybe I missed
> something...?
lastAppendedTimestamp means the timestamp of the message that last appended
to the log.
If a broker is a leader, since it will assign the timestamp by itself, the
lastAppenedTimestamp will be its local clock when append the last message.
So if there is no leader migration, max(lastAppendedTimestamp,
currentTimeMillis) = currentTimeMillis.
If a broker is a follower, because it will keep the leader's timestamp
unchanged, the lastAppendedTime would be the leader's clock when it appends
that message message. It keeps track of the lastAppendedTime only in case
it becomes leader later on. At that point, it is possible that the
timestamp of the last appended message was stamped by old leader, but the
new leader's currentTimeMillis < lastAppendedTime. If a new message comes,
instead of stamp it with new leader's currentTimeMillis, we have to stamp
it to lastAppendedTime to avoid the timestamp in the log going backward.
The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the
broker side clock. If MM produces message with different LogAppendTime in
source clusters to the same target cluster, the LogAppendTime will be
ignored  re-stamped by target cluster.
I added a use case example for mirror maker in KIP-32. Also there is a
corner case discussion about when we need the max(lastAppendedTime,
currentTimeMillis) trick. Could you take a look and see if that answers
your question?

>
> My main motivation is that given that both Samza and Kafka streams are
> doing work that implies a mandatory client-defined notion of time, I
really
> think introducing a different mandatory notion of time in Kafka is going
to
> be quite odd. We should think hard about how client-defined time could
> work. I'm not sure if it can, but I'm also not sure that it can't. Having
> both will be odd. Did you chat about this with Yi/Kartik on the Samza
side?
I talked with Kartik and realized that it would be useful to have a client
timestamp to facilitate use cases like stream processing.
I was trying to figure out if we can simply use client timestamp without
introducing the server time. There are some discussion in the KIP.
The key problem we want to solve here is
1. We want log retention and rolling to depend on server clock.
2. We want to make sure the log-assiciated timestamp to be retained when
replicas moves.
3. We want to use the timestamp in some way that can allow searching by
timestamp.
For 1 and 2, an alternative is to pass the log-associated timestamp through
replication, that means we need to have a different protocol for replica
fetching to pass log-associated timestamp. It is actually complicated and
there could be a lot of corner cases to handle. e.g. what if an old leader
started to fetch from the new leader, should it also update all of its old
log segment timestamp?
I think actually client side timestamp would be better for 3 if we can find
a way to make it work.
So far I am not able to convince myself that only having client side
timestamp would work mainly because 1 and 2. There are a few situations I
mentioned in the KIP.
>
> When you are saying it won't work you are assuming some particular
> implementation? Maybe that the index is a monotonically increasing set of
> pointers to the least record with a timestamp larger than the index time?
> In other words a search for time X gives the largest offset at which all
> records are <= X?
It is a promising idea. We probably can have an in-memory index like that,
but might be complicated to have a file on disk like that. Imagine there
are two timestamps T0 < T1. We see message Y created at T1 and created
index like [T1->Y], then we see message created at T1, supposedly we should
have index look like [T0->X, T1->Y], it is easy to do in memory, but we
might have to rewrite the index file completely. Maybe we can have the
first entry with timestamp to 0, and only update the first pointer for any
out of range timestamp, so the index will be [0->X, T1->Y]. Also, the range
of timestamps in the log segments can overlap with each other. That means
we either need to keep a cross segments index file or we need to check all
the index file for each log segment.
I separated out the time based log index to KIP-33 because it can be an
independent follow up feature as Neha suggested. I will try to make the
time based index work with client side timestamp.
>
> For retention, I agree with the problem you point out, but I think what
you
> are saying in that case is that you want a size limit too. If you use
> system time you actually hit the same problem: say you do a full dump of a
> DB table with a setting of 7 days retention, your retention will actually
> not get enforced for the first 7 days because the data is "new to Kafka".
I kind of think the size limit here is orthogonal. It is a valid use case
where people only want to use time based retention only. In your example,
depending on client timestamp might break the functionality - say it is a
bootstrap case people actually need to read all the data. If we depend on
the client timestamp, the data might be deleted instantly when they come to
the broker. It might be too demanding to expect the broker to understand
what people actually want to do with the data coming in. So the guarantee
of using server side timestamp is that "after appended to the log, all
messages will be available on broker for retention time", which is not
changeable by clients.
>
> -Jay

On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com> wrote:

> Hi folks,
>
> This proposal was previously in KIP-31 and we separated it to KIP-32 per
> Neha and Jay's suggestion.
>
> The proposal is to add the following two timestamps to Kafka message.
> - CreateTime
> - LogAppendTime
>
> The CreateTime will be set by the producer and will change after that. The
> LogAppendTime will be set by broker for purpose such as enforce log
> retention and log rolling.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>

Reply via email to