Hey Kartik, Yes, I agree exactly with your characterization.
The question is "what is the meaning of retention?" It could mean either: 1. "Retain data that is no more than 7 days old" 2. "Retain data for 7 days from when you get it" I don't know if either is actually a clear winner. Each is intuitive and easily expressible: 1. "We have the last 7 days of data in Kafka" 2. "You have 7 days to get your data from Kafka from whenever it arrives" Each has corner cases: 1. May lead to retaining data for too little time in the bootstrap case 2. May lead to over-retention in the bootstrap case Which failure is worse probably depends on who you ask: 1. May lead to not retaining data long enough for a consumer to get it, which the consumer would say is very bad 2. May lead to running out of disk space, which ops would say is very bad -Jay On Tue, Oct 20, 2015 at 12:02 AM, Kartik Paramasivam < kparamasi...@linkedin.com.invalid> wrote: > Joel or Becket will probably respond back in more detail.. but here are my > 2c. > > From the standpoint of LinkedIN, the suggested proposal works.. in essence > max.appenddelay can be used to turn "creationTime" into "logAppendTime". > this does mean that at LinkedIn we won't be able to use "creationTime".. > however that might also be fine because we anyways use the timeStamp that > is set inside the avro payload. > > Keeping LI aside though, it looks like there are two distinct possible > goals. > 1. The broker will retain messages for x days after a message shows up at > the broker. This behavior would super deterministic and would never > change depending on the contents of the message or anything else. > > 2. The client is in "partial" control of how long a message stays in the > broker based on the creationTime stamped by the client. > > Although (2) could be a feature in some scenarios..but in many scenarios it > can be pretty unintuitive and be perceived as an anti-feature. For e.g say > a mobile client buffered up some messages because the device was offline > (maybe in a plane).. and then sent the message after say 23 hours on a > plane. The message shows up in a Kafka topic with 24 hour retention.. and > now the message gets deleted in 1 hour. > > Kartik > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <j...@confluent.io> wrote: > > > Here's my basic take: > > - I agree it would be nice to have a notion of time baked in if it were > > done right > > - All the proposals so far seem pretty complex--I think they might make > > things worse rather than better overall > > - I think adding 2x8 byte timestamps to the message is probably a > > non-starter from a size perspective > > - Even if it isn't in the message, having two notions of time that > control > > different things is a bit confusing > > - The mechanics of basing retention etc on log append time when that's > not > > in the log seem complicated > > > > To that end here is a possible 4th option. Let me know what you think. > > > > The basic idea is that the message creation time is closest to what the > > user actually cares about but is dangerous if set wrong. So rather than > > substitute another notion of time, let's try to ensure the correctness of > > message creation time by preventing arbitrarily bad message creation > times. > > > > First, let's see if we can agree that log append time is not something > > anyone really cares about but rather an implementation detail. The > > timestamp that matters to the user is when the message occurred (the > > creation time). The log append time is basically just an approximation to > > this on the assumption that the message creation and the message receive > on > > the server occur pretty close together and the reason to prefer . > > > > But as these values diverge the issue starts to become apparent. Say you > > set the retention to one week and then mirror data from a topic > containing > > two years of retention. Your intention is clearly to keep the last week, > > but because the mirroring is appending right now you will keep two years. > > > > The reason we are liking log append time is because we are (justifiably) > > concerned that in certain situations the creation time may not be > > trustworthy. This same problem exists on the servers but there are fewer > > servers and they just run the kafka code so it is less of an issue. > > > > There are two possible ways to handle this: > > > > 1. Just tell people to add size based retention. I think this is not > > entirely unreasonable, we're basically saying we retain data based on > > the > > timestamp you give us in the data. If you give us bad data we will > > retain > > it for a bad amount of time. If you want to ensure we don't retain > "too > > much" data, define "too much" by setting a time-based retention > setting. > > This is not entirely unreasonable but kind of suffers from a "one bad > > apple" problem in a very large environment. > > 2. Prevent bad timestamps. In general we can't say a timestamp is bad. > > However the definition we're implicitly using is that we think there > > are a > > set of topics/clusters where the creation timestamp should always be > > "very > > close" to the log append timestamp. This is true for data sources that > > have > > no buffering capability (which at LinkedIn is very common, but is more > > rare > > elsewhere). The solution in this case would be to allow a setting > along > > the > > lines of max.append.delay which checks the creation timestamp against > > the > > server time to look for too large a divergence. The solution would > > either > > be to reject the message or to override it with the server time. > > > > So in LI's environment you would configure the clusters used for direct, > > unbuffered, message production (e.g. tracking and metrics local) to > enforce > > a reasonably aggressive timestamp bound (say 10 mins), and all other > > clusters would just inherent these. > > > > The downside of this approach is requiring the special configuration. > > However I think in 99% of environments this could be skipped entirely, > it's > > only when the ratio of clients to servers gets so massive that you need > to > > do this. The primary upside is that you have a single authoritative > notion > > of time which is closest to what a user would want and is stored directly > > in the message. > > > > I'm also assuming there is a workable approach for indexing non-monotonic > > timestamps, though I haven't actually worked that out. > > > > -Jay > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin <j...@linkedin.com.invalid> > > wrote: > > > > > Bumping up this thread although most of the discussion were on the > > > discussion thread of KIP-31 :) > > > > > > I just updated the KIP page to add detailed solution for the option > > (Option > > > 3) that does not expose the LogAppendTime to user. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message > > > > > > The option has a minor change to the fetch request to allow fetching > time > > > index entry as well. I kind of like this solution because its just > doing > > > what we need without introducing other things. > > > > > > It will be great to see what are the feedback. I can explain more > during > > > tomorrow's KIP hangout. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <j...@linkedin.com> > wrote: > > > > > > > Hi Jay, > > > > > > > > I just copy/pastes here your feedback on the timestamp proposal that > > was > > > > in the discussion thread of KIP-31. Please see the replies inline. > > > > The main change I made compared with previous proposal is to add both > > > > CreateTime and LogAppendTime to the message. > > > > > > > > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > Hey Beckett, > > > > > > > > > > I was proposing splitting up the KIP just for simplicity of > > discussion. > > > > You > > > > > can still implement them in one patch. I think otherwise it will be > > > hard > > > > to > > > > > discuss/vote on them since if you like the offset proposal but not > > the > > > > time > > > > > proposal what do you do? > > > > > > > > > > Introducing a second notion of time into Kafka is a pretty massive > > > > > philosophical change so it kind of warrants it's own KIP I think it > > > > isn't > > > > > just "Change message format". > > > > > > > > > > WRT time I think one thing to clarify in the proposal is how MM > will > > > have > > > > > access to set the timestamp? Presumably this will be a new field in > > > > > ProducerRecord, right? If so then any user can set the timestamp, > > > right? > > > > > I'm not sure you answered the questions around how this will work > for > > > MM > > > > > since when MM retains timestamps from multiple partitions they will > > > then > > > > be > > > > > out of order and in the past (so the max(lastAppendedTimestamp, > > > > > currentTimeMillis) override you proposed will not work, right?). If > > we > > > > > don't do this then when you set up mirroring the data will all be > new > > > and > > > > > you have the same retention problem you described. Maybe I missed > > > > > something...? > > > > lastAppendedTimestamp means the timestamp of the message that last > > > > appended to the log. > > > > If a broker is a leader, since it will assign the timestamp by > itself, > > > the > > > > lastAppenedTimestamp will be its local clock when append the last > > > message. > > > > So if there is no leader migration, max(lastAppendedTimestamp, > > > > currentTimeMillis) = currentTimeMillis. > > > > If a broker is a follower, because it will keep the leader's > timestamp > > > > unchanged, the lastAppendedTime would be the leader's clock when it > > > appends > > > > that message message. It keeps track of the lastAppendedTime only in > > case > > > > it becomes leader later on. At that point, it is possible that the > > > > timestamp of the last appended message was stamped by old leader, but > > the > > > > new leader's currentTimeMillis < lastAppendedTime. If a new message > > > comes, > > > > instead of stamp it with new leader's currentTimeMillis, we have to > > stamp > > > > it to lastAppendedTime to avoid the timestamp in the log going > > backward. > > > > The max(lastAppendedTimestamp, currentTimeMillis) is purely based on > > the > > > > broker side clock. If MM produces message with different > LogAppendTime > > in > > > > source clusters to the same target cluster, the LogAppendTime will be > > > > ignored re-stamped by target cluster. > > > > I added a use case example for mirror maker in KIP-32. Also there is > a > > > > corner case discussion about when we need the max(lastAppendedTime, > > > > currentTimeMillis) trick. Could you take a look and see if that > answers > > > > your question? > > > > > > > > > > > > > > My main motivation is that given that both Samza and Kafka streams > > are > > > > > doing work that implies a mandatory client-defined notion of time, > I > > > > really > > > > > think introducing a different mandatory notion of time in Kafka is > > > going > > > > to > > > > > be quite odd. We should think hard about how client-defined time > > could > > > > > work. I'm not sure if it can, but I'm also not sure that it can't. > > > Having > > > > > both will be odd. Did you chat about this with Yi/Kartik on the > Samza > > > > side? > > > > I talked with Kartik and realized that it would be useful to have a > > > client > > > > timestamp to facilitate use cases like stream processing. > > > > I was trying to figure out if we can simply use client timestamp > > without > > > > introducing the server time. There are some discussion in the KIP. > > > > The key problem we want to solve here is > > > > 1. We want log retention and rolling to depend on server clock. > > > > 2. We want to make sure the log-assiciated timestamp to be retained > > when > > > > replicas moves. > > > > 3. We want to use the timestamp in some way that can allow searching > by > > > > timestamp. > > > > For 1 and 2, an alternative is to pass the log-associated timestamp > > > > through replication, that means we need to have a different protocol > > for > > > > replica fetching to pass log-associated timestamp. It is actually > > > > complicated and there could be a lot of corner cases to handle. e.g. > > what > > > > if an old leader started to fetch from the new leader, should it also > > > > update all of its old log segment timestamp? > > > > I think actually client side timestamp would be better for 3 if we > can > > > > find a way to make it work. > > > > So far I am not able to convince myself that only having client side > > > > timestamp would work mainly because 1 and 2. There are a few > > situations I > > > > mentioned in the KIP. > > > > > > > > > > When you are saying it won't work you are assuming some particular > > > > > implementation? Maybe that the index is a monotonically increasing > > set > > > of > > > > > pointers to the least record with a timestamp larger than the index > > > time? > > > > > In other words a search for time X gives the largest offset at > which > > > all > > > > > records are <= X? > > > > It is a promising idea. We probably can have an in-memory index like > > > that, > > > > but might be complicated to have a file on disk like that. Imagine > > there > > > > are two timestamps T0 < T1. We see message Y created at T1 and > created > > > > index like [T1->Y], then we see message created at T1, supposedly we > > > should > > > > have index look like [T0->X, T1->Y], it is easy to do in memory, but > we > > > > might have to rewrite the index file completely. Maybe we can have > the > > > > first entry with timestamp to 0, and only update the first pointer > for > > > any > > > > out of range timestamp, so the index will be [0->X, T1->Y]. Also, the > > > range > > > > of timestamps in the log segments can overlap with each other. That > > means > > > > we either need to keep a cross segments index file or we need to > check > > > all > > > > the index file for each log segment. > > > > I separated out the time based log index to KIP-33 because it can be > an > > > > independent follow up feature as Neha suggested. I will try to make > the > > > > time based index work with client side timestamp. > > > > > > > > > > For retention, I agree with the problem you point out, but I think > > what > > > > you > > > > > are saying in that case is that you want a size limit too. If you > use > > > > > system time you actually hit the same problem: say you do a full > dump > > > of > > > > a > > > > > DB table with a setting of 7 days retention, your retention will > > > actually > > > > > not get enforced for the first 7 days because the data is "new to > > > Kafka". > > > > I kind of think the size limit here is orthogonal. It is a valid use > > case > > > > where people only want to use time based retention only. In your > > example, > > > > depending on client timestamp might break the functionality - say it > > is a > > > > bootstrap case people actually need to read all the data. If we > depend > > on > > > > the client timestamp, the data might be deleted instantly when they > > come > > > to > > > > the broker. It might be too demanding to expect the broker to > > understand > > > > what people actually want to do with the data coming in. So the > > guarantee > > > > of using server side timestamp is that "after appended to the log, > all > > > > messages will be available on broker for retention time", which is > not > > > > changeable by clients. > > > > > > > > > > -Jay > > > > > > > > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <j...@linkedin.com> > > > wrote: > > > > > > > >> Hi folks, > > > >> > > > >> This proposal was previously in KIP-31 and we separated it to KIP-32 > > per > > > >> Neha and Jay's suggestion. > > > >> > > > >> The proposal is to add the following two timestamps to Kafka > message. > > > >> - CreateTime > > > >> - LogAppendTime > > > >> > > > >> The CreateTime will be set by the producer and will change after > that. > > > >> The LogAppendTime will be set by broker for purpose such as enforce > > log > > > >> retention and log rolling. > > > >> > > > >> Thanks, > > > >> > > > >> Jiangjie (Becket) Qin > > > >> > > > >> > > > > > > > > > >