Bumping up this thread although most of the discussion were on the
discussion thread of KIP-31 :)

I just updated the KIP page to add detailed solution for the option (Option
3) that does not expose the LogAppendTime to user.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message

The option has a minor change to the fetch request to allow fetching time
index entry as well. I kind of like this solution because its just doing
what we need without introducing other things.

It will be great to see what are the feedback. I can explain more during
tomorrow's KIP hangout.

Thanks,

Jiangjie (Becket) Qin

On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com> wrote:

> Hi Jay,
>
> I just copy/pastes here your feedback on the timestamp proposal that was
> in the discussion thread of KIP-31. Please see the replies inline.
> The main change I made compared with previous proposal is to add both
> CreateTime and LogAppendTime to the message.
>
> On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Beckett,
> >
> > I was proposing splitting up the KIP just for simplicity of discussion.
> You
> > can still implement them in one patch. I think otherwise it will be hard
> to
> > discuss/vote on them since if you like the offset proposal but not the
> time
> > proposal what do you do?
> >
> > Introducing a second notion of time into Kafka is a pretty massive
> > philosophical change so it kind of warrants it's own KIP I think it
> isn't
> > just "Change message format".
> >
> > WRT time I think one thing to clarify in the proposal is how MM will have
> > access to set the timestamp? Presumably this will be a new field in
> > ProducerRecord, right? If so then any user can set the timestamp, right?
> > I'm not sure you answered the questions around how this will work for MM
> > since when MM retains timestamps from multiple partitions they will then
> be
> > out of order and in the past (so the max(lastAppendedTimestamp,
> > currentTimeMillis) override you proposed will not work, right?). If we
> > don't do this then when you set up mirroring the data will all be new and
> > you have the same retention problem you described. Maybe I missed
> > something...?
> lastAppendedTimestamp means the timestamp of the message that last
> appended to the log.
> If a broker is a leader, since it will assign the timestamp by itself, the
> lastAppenedTimestamp will be its local clock when append the last message.
> So if there is no leader migration, max(lastAppendedTimestamp,
> currentTimeMillis) = currentTimeMillis.
> If a broker is a follower, because it will keep the leader's timestamp
> unchanged, the lastAppendedTime would be the leader's clock when it appends
> that message message. It keeps track of the lastAppendedTime only in case
> it becomes leader later on. At that point, it is possible that the
> timestamp of the last appended message was stamped by old leader, but the
> new leader's currentTimeMillis < lastAppendedTime. If a new message comes,
> instead of stamp it with new leader's currentTimeMillis, we have to stamp
> it to lastAppendedTime to avoid the timestamp in the log going backward.
> The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the
> broker side clock. If MM produces message with different LogAppendTime in
> source clusters to the same target cluster, the LogAppendTime will be
> ignored  re-stamped by target cluster.
> I added a use case example for mirror maker in KIP-32. Also there is a
> corner case discussion about when we need the max(lastAppendedTime,
> currentTimeMillis) trick. Could you take a look and see if that answers
> your question?
>
> >
> > My main motivation is that given that both Samza and Kafka streams are
> > doing work that implies a mandatory client-defined notion of time, I
> really
> > think introducing a different mandatory notion of time in Kafka is going
> to
> > be quite odd. We should think hard about how client-defined time could
> > work. I'm not sure if it can, but I'm also not sure that it can't. Having
> > both will be odd. Did you chat about this with Yi/Kartik on the Samza
> side?
> I talked with Kartik and realized that it would be useful to have a client
> timestamp to facilitate use cases like stream processing.
> I was trying to figure out if we can simply use client timestamp without
> introducing the server time. There are some discussion in the KIP.
> The key problem we want to solve here is
> 1. We want log retention and rolling to depend on server clock.
> 2. We want to make sure the log-assiciated timestamp to be retained when
> replicas moves.
> 3. We want to use the timestamp in some way that can allow searching by
> timestamp.
> For 1 and 2, an alternative is to pass the log-associated timestamp
> through replication, that means we need to have a different protocol for
> replica fetching to pass log-associated timestamp. It is actually
> complicated and there could be a lot of corner cases to handle. e.g. what
> if an old leader started to fetch from the new leader, should it also
> update all of its old log segment timestamp?
> I think actually client side timestamp would be better for 3 if we can
> find a way to make it work.
> So far I am not able to convince myself that only having client side
> timestamp would work mainly because 1 and 2. There are a few situations I
> mentioned in the KIP.
> >
> > When you are saying it won't work you are assuming some particular
> > implementation? Maybe that the index is a monotonically increasing set of
> > pointers to the least record with a timestamp larger than the index time?
> > In other words a search for time X gives the largest offset at which all
> > records are <= X?
> It is a promising idea. We probably can have an in-memory index like that,
> but might be complicated to have a file on disk like that. Imagine there
> are two timestamps T0 < T1. We see message Y created at T1 and created
> index like [T1->Y], then we see message created at T1, supposedly we should
> have index look like [T0->X, T1->Y], it is easy to do in memory, but we
> might have to rewrite the index file completely. Maybe we can have the
> first entry with timestamp to 0, and only update the first pointer for any
> out of range timestamp, so the index will be [0->X, T1->Y]. Also, the range
> of timestamps in the log segments can overlap with each other. That means
> we either need to keep a cross segments index file or we need to check all
> the index file for each log segment.
> I separated out the time based log index to KIP-33 because it can be an
> independent follow up feature as Neha suggested. I will try to make the
> time based index work with client side timestamp.
> >
> > For retention, I agree with the problem you point out, but I think what
> you
> > are saying in that case is that you want a size limit too. If you use
> > system time you actually hit the same problem: say you do a full dump of
> a
> > DB table with a setting of 7 days retention, your retention will actually
> > not get enforced for the first 7 days because the data is "new to Kafka".
> I kind of think the size limit here is orthogonal. It is a valid use case
> where people only want to use time based retention only. In your example,
> depending on client timestamp might break the functionality - say it is a
> bootstrap case people actually need to read all the data. If we depend on
> the client timestamp, the data might be deleted instantly when they come to
> the broker. It might be too demanding to expect the broker to understand
> what people actually want to do with the data coming in. So the guarantee
> of using server side timestamp is that "after appended to the log, all
> messages will be available on broker for retention time", which is not
> changeable by clients.
> >
> > -Jay
>
> On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>
>> Hi folks,
>>
>> This proposal was previously in KIP-31 and we separated it to KIP-32 per
>> Neha and Jay's suggestion.
>>
>> The proposal is to add the following two timestamps to Kafka message.
>> - CreateTime
>> - LogAppendTime
>>
>> The CreateTime will be set by the producer and will change after that.
>> The LogAppendTime will be set by broker for purpose such as enforce log
>> retention and log rolling.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>

Reply via email to