Bumping up this thread although most of the discussion were on the discussion thread of KIP-31 :)
I just updated the KIP page to add detailed solution for the option (Option 3) that does not expose the LogAppendTime to user. https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message The option has a minor change to the fetch request to allow fetching time index entry as well. I kind of like this solution because its just doing what we need without introducing other things. It will be great to see what are the feedback. I can explain more during tomorrow's KIP hangout. Thanks, Jiangjie (Becket) Qin On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com> wrote: > Hi Jay, > > I just copy/pastes here your feedback on the timestamp proposal that was > in the discussion thread of KIP-31. Please see the replies inline. > The main change I made compared with previous proposal is to add both > CreateTime and LogAppendTime to the message. > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote: > > > Hey Beckett, > > > > I was proposing splitting up the KIP just for simplicity of discussion. > You > > can still implement them in one patch. I think otherwise it will be hard > to > > discuss/vote on them since if you like the offset proposal but not the > time > > proposal what do you do? > > > > Introducing a second notion of time into Kafka is a pretty massive > > philosophical change so it kind of warrants it's own KIP I think it > isn't > > just "Change message format". > > > > WRT time I think one thing to clarify in the proposal is how MM will have > > access to set the timestamp? Presumably this will be a new field in > > ProducerRecord, right? If so then any user can set the timestamp, right? > > I'm not sure you answered the questions around how this will work for MM > > since when MM retains timestamps from multiple partitions they will then > be > > out of order and in the past (so the max(lastAppendedTimestamp, > > currentTimeMillis) override you proposed will not work, right?). If we > > don't do this then when you set up mirroring the data will all be new and > > you have the same retention problem you described. Maybe I missed > > something...? > lastAppendedTimestamp means the timestamp of the message that last > appended to the log. > If a broker is a leader, since it will assign the timestamp by itself, the > lastAppenedTimestamp will be its local clock when append the last message. > So if there is no leader migration, max(lastAppendedTimestamp, > currentTimeMillis) = currentTimeMillis. > If a broker is a follower, because it will keep the leader's timestamp > unchanged, the lastAppendedTime would be the leader's clock when it appends > that message message. It keeps track of the lastAppendedTime only in case > it becomes leader later on. At that point, it is possible that the > timestamp of the last appended message was stamped by old leader, but the > new leader's currentTimeMillis < lastAppendedTime. If a new message comes, > instead of stamp it with new leader's currentTimeMillis, we have to stamp > it to lastAppendedTime to avoid the timestamp in the log going backward. > The max(lastAppendedTimestamp, currentTimeMillis) is purely based on the > broker side clock. If MM produces message with different LogAppendTime in > source clusters to the same target cluster, the LogAppendTime will be > ignored re-stamped by target cluster. > I added a use case example for mirror maker in KIP-32. Also there is a > corner case discussion about when we need the max(lastAppendedTime, > currentTimeMillis) trick. Could you take a look and see if that answers > your question? > > > > > My main motivation is that given that both Samza and Kafka streams are > > doing work that implies a mandatory client-defined notion of time, I > really > > think introducing a different mandatory notion of time in Kafka is going > to > > be quite odd. We should think hard about how client-defined time could > > work. I'm not sure if it can, but I'm also not sure that it can't. Having > > both will be odd. Did you chat about this with Yi/Kartik on the Samza > side? > I talked with Kartik and realized that it would be useful to have a client > timestamp to facilitate use cases like stream processing. > I was trying to figure out if we can simply use client timestamp without > introducing the server time. There are some discussion in the KIP. > The key problem we want to solve here is > 1. We want log retention and rolling to depend on server clock. > 2. We want to make sure the log-assiciated timestamp to be retained when > replicas moves. > 3. We want to use the timestamp in some way that can allow searching by > timestamp. > For 1 and 2, an alternative is to pass the log-associated timestamp > through replication, that means we need to have a different protocol for > replica fetching to pass log-associated timestamp. It is actually > complicated and there could be a lot of corner cases to handle. e.g. what > if an old leader started to fetch from the new leader, should it also > update all of its old log segment timestamp? > I think actually client side timestamp would be better for 3 if we can > find a way to make it work. > So far I am not able to convince myself that only having client side > timestamp would work mainly because 1 and 2. There are a few situations I > mentioned in the KIP. > > > > When you are saying it won't work you are assuming some particular > > implementation? Maybe that the index is a monotonically increasing set of > > pointers to the least record with a timestamp larger than the index time? > > In other words a search for time X gives the largest offset at which all > > records are <= X? > It is a promising idea. We probably can have an in-memory index like that, > but might be complicated to have a file on disk like that. Imagine there > are two timestamps T0 < T1. We see message Y created at T1 and created > index like [T1->Y], then we see message created at T1, supposedly we should > have index look like [T0->X, T1->Y], it is easy to do in memory, but we > might have to rewrite the index file completely. Maybe we can have the > first entry with timestamp to 0, and only update the first pointer for any > out of range timestamp, so the index will be [0->X, T1->Y]. Also, the range > of timestamps in the log segments can overlap with each other. That means > we either need to keep a cross segments index file or we need to check all > the index file for each log segment. > I separated out the time based log index to KIP-33 because it can be an > independent follow up feature as Neha suggested. I will try to make the > time based index work with client side timestamp. > > > > For retention, I agree with the problem you point out, but I think what > you > > are saying in that case is that you want a size limit too. If you use > > system time you actually hit the same problem: say you do a full dump of > a > > DB table with a setting of 7 days retention, your retention will actually > > not get enforced for the first 7 days because the data is "new to Kafka". > I kind of think the size limit here is orthogonal. It is a valid use case > where people only want to use time based retention only. In your example, > depending on client timestamp might break the functionality - say it is a > bootstrap case people actually need to read all the data. If we depend on > the client timestamp, the data might be deleted instantly when they come to > the broker. It might be too demanding to expect the broker to understand > what people actually want to do with the data coming in. So the guarantee > of using server side timestamp is that "after appended to the log, all > messages will be available on broker for retention time", which is not > changeable by clients. > > > > -Jay > > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com> wrote: > >> Hi folks, >> >> This proposal was previously in KIP-31 and we separated it to KIP-32 per >> Neha and Jay's suggestion. >> >> The proposal is to add the following two timestamps to Kafka message. >> - CreateTime >> - LogAppendTime >> >> The CreateTime will be set by the producer and will change after that. >> The LogAppendTime will be set by broker for purpose such as enforce log >> retention and log rolling. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >