Jun, Jiangjie, I am confused about 3) here, if we use "the timestamp of the latest message" then doesn't this mean we will roll the log whenever a message delayed by rolling time is received as well? Just to clarify, my understanding of "the timestamp of the latest message", for example in the following log, is 1, not 5:
2, 3, 4, 5, 1 Guozhang On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote: > 1. Hmm, it's more intuitive if the consumer sees the same timestamp whether > the messages are compressed or not. When > message.timestamp.type=LogAppendTime, > we will need to set timestamp in each message if messages are not > compressed, so that the follower can get the same timestamp. So, it seems > that we should do the same thing for inner messages when messages are > compressed. > > 4. I thought on startup, we restore the timestamp of the latest message by > reading from the time index of the last log segment. So, what happens if > there are no index entries? > > Thanks, > > Jun > > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com> wrote: > > > Thanks for the explanation, Jun. > > > > 1. That makes sense. So maybe we can do the following: > > (a) Set the timestamp in the compressed message to latest timestamp of > all > > its inner messages. This works for both LogAppendTime and CreateTime. > > (b) If message.timestamp.type=LogAppendTime, the broker will overwrite > all > > the inner message timestamp to -1 if they are not set to -1. This is > mainly > > for topics that are using LogAppendTime. Hopefully the producer will set > > the timestamp to -1 in the ProducerRecord to avoid server side > > recompression. > > > > 3. I see. That works. So the semantic of log rolling becomes "roll out > the > > log segment if it has been inactive since the latest message has > arrived." > > > > 4. Yes. If the largest timestamp is in previous log segment. The time > index > > for the current log segment does not have a valid offset in current log > > segment to point to. Maybe in that case we should build an empty log > index. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> wrote: > > > > > 1. I was thinking more about saving the decompression overhead in the > > > follower. Currently, the follower doesn't decompress the messages. To > > keep > > > it that way, the outer message needs to include the timestamp of the > > latest > > > inner message to build the time index in the follower. The simplest > thing > > > to do is to change the timestamp in the inner messages if necessary, in > > > which case there will be the recompression overhead. However, in the > case > > > when the timestamp of the inner messages don't have to be changed > > > (hopefully more common), there won't be the recompression overhead. In > > > either case, we always set the timestamp in the outer message to be the > > > timestamp of the latest inner message, in the leader. > > > > > > 3. Basically, in each log segment, we keep track of the timestamp of > the > > > latest message. If current time - timestamp of latest message > log > > rolling > > > interval, we roll a new log segment. So, if messages with later > > timestamps > > > keep getting added, we only roll new log segments based on size. On the > > > other hand, if no new messages are added to a log, we can force a log > > roll > > > based on time, which addresses the issue in (b). > > > > > > 4. Hmm, the index is per segment and should only point to positions in > > the > > > corresponding .log file, not previous ones, right? > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks a lot for the comments. Please see inline replies. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Becket, > > > > > > > > > > Thanks for the proposal. Looks good overall. A few comments below. > > > > > > > > > > 1. KIP-32 didn't say what timestamp should be set in a compressed > > > > message. > > > > > We probably should set it to the timestamp of the latest messages > > > > included > > > > > in the compressed one. This way, during indexing, we don't have to > > > > > decompress the message. > > > > > > > > > That is a good point. > > > > In normal cases, broker needs to decompress the message for > > verification > > > > purpose anyway. So building time index does not add additional > > > > decompression. > > > > During time index recovery, however, having a timestamp in compressed > > > > message might save the decompression. > > > > > > > > Another thing I am thinking is we should make sure KIP-32 works well > > with > > > > KIP-31. i.e. we don't want to do recompression in order to add > > timestamp > > > to > > > > messages. > > > > Take the approach in my last email, the timestamp in the messages > will > > > > either all be overwritten by server if > > > > message.timestamp.type=LogAppendTime, or they will not be overwritten > > if > > > > message.timestamp.type=CreateTime. > > > > > > > > Maybe we can use the timestamp in compressed messages in the > following > > > way: > > > > If message.timestamp.type=LogAppendTime, we have to overwrite > > timestamps > > > > for all the messages. We can simply write the timestamp in the > > compressed > > > > message to avoid recompression. > > > > If message.timestamp.type=CreateTime, we do not need to overwrite the > > > > timestamps. We either reject the entire compressed message or We just > > > leave > > > > the compressed message timestamp to be -1. > > > > > > > > So the semantic of the timestamp field in compressed message field > > > becomes: > > > > if it is greater than 0, that means LogAppendTime is used, the > > timestamp > > > of > > > > the inner messages is the compressed message LogAppendTime. If it is > > -1, > > > > that means the CreateTime is used, the timestamp is in each > individual > > > > inner message. > > > > > > > > This sacrifice the speed of recovery but seems worthy because we > avoid > > > > recompression. > > > > > > > > > > > > > 2. In KIP-33, should we make the time-based index interval > > > configurable? > > > > > Perhaps we can default it 60 secs, but allow users to configure it > to > > > > > smaller values if they want more precision. > > > > > > > > > Yes, we can do that. > > > > > > > > > > > > > 3. In KIP-33, I am not sure if log rolling should be based on the > > > > earliest > > > > > message. This would mean that we will need to roll a log segment > > every > > > > time > > > > > we get a message delayed by the log rolling time interval. Also, on > > > > broker > > > > > startup, we can get the timestamp of the latest message in a log > > > segment > > > > > pretty efficiently by just looking at the last time index entry. > But > > > > > getting the timestamp of the earliest timestamp requires a full > scan > > of > > > > all > > > > > log segments, which can be expensive. Previously, there were two > use > > > > cases > > > > > of time-based rolling: (a) more accurate time-based indexing and > (b) > > > > > retaining data by time (since the active segment is never deleted). > > (a) > > > > is > > > > > already solved with a time-based index. For (b), if the retention > is > > > > based > > > > > on the timestamp of the latest message in a log segment, perhaps > log > > > > > rolling should be based on that too. > > > > > > > > > I am not sure how to make log rolling work with the latest timestamp > in > > > > current log segment. Do you mean the log rolling can based on the > last > > > log > > > > segment's latest timestamp? If so how do we roll out the first > segment? > > > > > > > > > > > > > 4. In KIP-33, I presume the timestamp in the time index will be > > > > > monotonically increasing. So, if all messages in a log segment > have a > > > > > timestamp less than the largest timestamp in the previous log > > segment, > > > we > > > > > will use the latter to index this log segment? > > > > > > > > > Yes. The timestamps are monotonically increasing. If the largest > > > timestamp > > > > in the previous segment is very big, it is possible the time index of > > the > > > > current segment only have two index entries (inserted during segment > > > > creation and roll out), both are pointing to a message in the > previous > > > log > > > > segment. This is the corner case I mentioned before that we should > > expire > > > > the next log segment even before expiring the previous log segment > just > > > > because the largest timestamp is in previous log segment. In current > > > > approach, we will wait until the previous log segment expires, and > then > > > > delete both the previous log segment and the next log segment. > > > > > > > > > > > > > 5. In KIP-32, in the wire protocol, we mention both timestamp and > > time. > > > > > They should be consistent. > > > > > > > > > Will fix the wiki page. > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin <becket....@gmail.com > > > > > > wrote: > > > > > > > > > > > Hey Jay, > > > > > > > > > > > > Thanks for the comments. > > > > > > > > > > > > Good point about the actions after when > max.message.time.difference > > > is > > > > > > exceeded. Rejection is a useful behavior although I cannot think > of > > > use > > > > > > case at LinkedIn at this moment. I think it makes sense to add a > > > > > > configuration. > > > > > > > > > > > > How about the following configurations? > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime > > > > > > 2. max.message.time.difference.ms > > > > > > > > > > > > if message.timestamp.type is set to CreateTime, when the broker > > > > receives > > > > > a > > > > > > message, it will further check max.message.time.difference.ms, > and > > > > will > > > > > > reject the message it the time difference exceeds the threshold. > > > > > > If message.timestamp.type is set to LogAppendTime, the broker > will > > > > always > > > > > > stamp the message with current server time, regardless the value > of > > > > > > max.message.time.difference.ms > > > > > > > > > > > > This will make sure the message on the broker is either > CreateTime > > or > > > > > > LogAppendTime, but not mixture of both. > > > > > > > > > > > > What do you think? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <j...@confluent.io> > > wrote: > > > > > > > > > > > > > Hey Becket, > > > > > > > > > > > > > > That summary of pros and cons sounds about right to me. > > > > > > > > > > > > > > There are potentially two actions you could take when > > > > > > > max.message.time.difference is exceeded--override it or reject > > the > > > > > > > message entirely. Can we pick one of these or does the action > > need > > > to > > > > > > > be configurable too? (I'm not sure). The downside of more > > > > > > > configuration is that it is more fiddly and has more modes. > > > > > > > > > > > > > > I suppose the reason I was thinking of this as a "difference" > > > rather > > > > > > > than a hard type was that if you were going to go the reject > mode > > > you > > > > > > > would need some tolerance setting (i.e. if your SLA is that if > > your > > > > > > > timestamp is off by more than 10 minutes I give you an error). > I > > > > agree > > > > > > > with you that having one field that is potentially containing a > > mix > > > > of > > > > > > > two values is a bit weird. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin < > becket....@gmail.com > > > > > > > > wrote: > > > > > > > > It looks the format of the previous email was messed up. Send > > it > > > > > again. > > > > > > > > > > > > > > > > Just to recap, the last proposal Jay made (with some > > > implementation > > > > > > > > details added) > > > > > > > > was: > > > > > > > > > > > > > > > > 1. Allow user to stamp the message when produce > > > > > > > > > > > > > > > > 2. When broker receives a message it take a look at the > > > difference > > > > > > > between > > > > > > > > its local time and the timestamp in the message. > > > > > > > > a. If the time difference is within a configurable > > > > > > > > max.message.time.difference.ms, the server will accept it > and > > > > append > > > > > > it > > > > > > > to > > > > > > > > the log. > > > > > > > > b. If the time difference is beyond the configured > > > > > > > > max.message.time.difference.ms, the server will override the > > > > > timestamp > > > > > > > with > > > > > > > > its current local time and append the message to the log. > > > > > > > > c. The default value of max.message.time.difference would > be > > > set > > > > to > > > > > > > > Long.MaxValue. > > > > > > > > > > > > > > > > 3. The configurable time difference threshold > > > > > > > > max.message.time.difference.ms will > > > > > > > > be a per topic configuration. > > > > > > > > > > > > > > > > 4. The indexed will be built so it has the following > guarantee. > > > > > > > > a. If user search by time stamp: > > > > > > > > - all the messages after that timestamp will be > consumed. > > > > > > > > - user might see earlier messages. > > > > > > > > b. The log retention will take a look at the last time > index > > > > entry > > > > > in > > > > > > > the > > > > > > > > time index file. Because the last entry will be the latest > > > > timestamp > > > > > in > > > > > > > the > > > > > > > > entire log segment. If that entry expires, the log segment > will > > > be > > > > > > > deleted. > > > > > > > > c. The log rolling has to depend on the earliest timestamp. > > In > > > > this > > > > > > > case > > > > > > > > we may need to keep a in memory timestamp only for the > current > > > > active > > > > > > > log. > > > > > > > > On recover, we will need to read the active log segment to > get > > > this > > > > > > > timestamp > > > > > > > > of the earliest messages. > > > > > > > > > > > > > > > > 5. The downside of this proposal are: > > > > > > > > a. The timestamp might not be monotonically increasing. > > > > > > > > b. The log retention might become non-deterministic. i.e. > > When > > > a > > > > > > > message > > > > > > > > will be deleted now depends on the timestamp of the other > > > messages > > > > in > > > > > > the > > > > > > > > same log segment. And those timestamps are provided by > > > > > > > > user within a range depending on what the time difference > > > threshold > > > > > > > > configuration is. > > > > > > > > c. The semantic meaning of the timestamp in the messages > > could > > > > be a > > > > > > > little > > > > > > > > bit vague because some of them come from the producer and > some > > of > > > > > them > > > > > > > are > > > > > > > > overwritten by brokers. > > > > > > > > > > > > > > > > 6. Although the proposal has some downsides, it gives user > the > > > > > > > flexibility > > > > > > > > to use the timestamp. > > > > > > > > a. If the threshold is set to Long.MaxValue. The timestamp > in > > > the > > > > > > > message is > > > > > > > > equivalent to CreateTime. > > > > > > > > b. If the threshold is set to 0. The timestamp in the > message > > > is > > > > > > > equivalent > > > > > > > > to LogAppendTime. > > > > > > > > > > > > > > > > This proposal actually allows user to use either CreateTime > or > > > > > > > LogAppendTime > > > > > > > > without introducing two timestamp concept at the same time. I > > > have > > > > > > > updated > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal. > > > > > > > > > > > > > > > > One thing I am thinking is that instead of having a time > > > difference > > > > > > > threshold, > > > > > > > > should we simply set have a TimestampType configuration? > > Because > > > in > > > > > > most > > > > > > > > cases, people will either set the threshold to 0 or > > > Long.MaxValue. > > > > > > > Setting > > > > > > > > anything in between will make the timestamp in the message > > > > > meaningless > > > > > > to > > > > > > > > user - user don't know if the timestamp has been overwritten > by > > > the > > > > > > > brokers. > > > > > > > > > > > > > > > > Any thoughts? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin > > > > > > <j...@linkedin.com.invalid > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Bump up this thread. > > > > > > > >> > > > > > > > >> Just to recap, the last proposal Jay made (with some > > > > implementation > > > > > > > details > > > > > > > >> added) was: > > > > > > > >> > > > > > > > >> 1. Allow user to stamp the message when produce > > > > > > > >> 2. When broker receives a message it take a look at the > > > > > difference > > > > > > > >> between its local time and the timestamp in the message. > > > > > > > >> - If the time difference is within a configurable > > > > > > > >> max.message.time.difference.ms, the server will > accept > > it > > > > and > > > > > > > append > > > > > > > >> it to the log. > > > > > > > >> - If the time difference is beyond the configured > > > > > > > >> max.message.time.difference.ms, the server will > > override > > > > the > > > > > > > >> timestamp with its current local time and append the > > > message > > > > > to > > > > > > > the > > > > > > > >> log. > > > > > > > >> - The default value of max.message.time.difference > would > > > be > > > > > set > > > > > > to > > > > > > > >> Long.MaxValue. > > > > > > > >> 3. The configurable time difference threshold > > > > > > > >> max.message.time.difference.ms will be a per topic > > > > > configuration. > > > > > > > >> 4. The indexed will be built so it has the following > > > guarantee. > > > > > > > >> - If user search by time stamp: > > > > > > > >> - all the messages after that timestamp will be consumed. > > > > > > > >> - user might see earlier messages. > > > > > > > >> - The log retention will take a look at the last time > > > index > > > > > > entry > > > > > > > in > > > > > > > >> the time index file. Because the last entry will be > the > > > > latest > > > > > > > >> timestamp in > > > > > > > >> the entire log segment. If that entry expires, the log > > > > segment > > > > > > > will > > > > > > > >> be > > > > > > > >> deleted. > > > > > > > >> - The log rolling has to depend on the earliest > > timestamp. > > > > In > > > > > > this > > > > > > > >> case we may need to keep a in memory timestamp only > for > > > the > > > > > > > >> current active > > > > > > > >> log. On recover, we will need to read the active log > > > segment > > > > > to > > > > > > > get > > > > > > > >> this > > > > > > > >> timestamp of the earliest messages. > > > > > > > >> 5. The downside of this proposal are: > > > > > > > >> - The timestamp might not be monotonically increasing. > > > > > > > >> - The log retention might become non-deterministic. > i.e. > > > > When > > > > > a > > > > > > > >> message will be deleted now depends on the timestamp > of > > > the > > > > > > > >> other messages > > > > > > > >> in the same log segment. And those timestamps are > > provided > > > > by > > > > > > > >> user within a > > > > > > > >> range depending on what the time difference threshold > > > > > > > configuration > > > > > > > >> is. > > > > > > > >> - The semantic meaning of the timestamp in the > messages > > > > could > > > > > > be a > > > > > > > >> little bit vague because some of them come from the > > > producer > > > > > and > > > > > > > >> some of > > > > > > > >> them are overwritten by brokers. > > > > > > > >> 6. Although the proposal has some downsides, it gives > > user > > > > the > > > > > > > >> flexibility to use the timestamp. > > > > > > > >> - If the threshold is set to Long.MaxValue. The timestamp > > in > > > > the > > > > > > > message > > > > > > > >> is equivalent to CreateTime. > > > > > > > >> - If the threshold is set to 0. The timestamp in the > > > message > > > > > is > > > > > > > >> equivalent to LogAppendTime. > > > > > > > >> > > > > > > > >> This proposal actually allows user to use either CreateTime > or > > > > > > > >> LogAppendTime without introducing two timestamp concept at > the > > > > same > > > > > > > time. I > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with this > > proposal. > > > > > > > >> > > > > > > > >> One thing I am thinking is that instead of having a time > > > > difference > > > > > > > >> threshold, should we simply set have a TimestampType > > > > configuration? > > > > > > > Because > > > > > > > >> in most cases, people will either set the threshold to 0 or > > > > > > > Long.MaxValue. > > > > > > > >> Setting anything in between will make the timestamp in the > > > message > > > > > > > >> meaningless to user - user don't know if the timestamp has > > been > > > > > > > overwritten > > > > > > > >> by the brokers. > > > > > > > >> > > > > > > > >> Any thoughts? > > > > > > > >> > > > > > > > >> Thanks, > > > > > > > >> Jiangjie (Becket) Qin > > > > > > > >> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin < > > > j...@linkedin.com> > > > > > > > wrote: > > > > > > > >> > > > > > > > >> > Hi Jay, > > > > > > > >> > > > > > > > > >> > Thanks for such detailed explanation. I think we both are > > > trying > > > > > to > > > > > > > make > > > > > > > >> > CreateTime work for us if possible. To me by "work" it > means > > > > clear > > > > > > > >> > guarantees on: > > > > > > > >> > 1. Log Retention Time enforcement. > > > > > > > >> > 2. Log Rolling time enforcement (This might be less a > > concern > > > as > > > > > you > > > > > > > >> > pointed out) > > > > > > > >> > 3. Application search message by time. > > > > > > > >> > > > > > > > > >> > WRT (1), I agree the expectation for log retention might > be > > > > > > different > > > > > > > >> > depending on who we ask. But my concern is about the level > > of > > > > > > > guarantee > > > > > > > >> we > > > > > > > >> > give to user. My observation is that a clear guarantee to > > user > > > > is > > > > > > > >> critical > > > > > > > >> > regardless of the mechanism we choose. And this is the > > subtle > > > > but > > > > > > > >> important > > > > > > > >> > difference between using LogAppendTime and CreateTime. > > > > > > > >> > > > > > > > > >> > Let's say user asks this question: How long will my > message > > > stay > > > > > in > > > > > > > >> Kafka? > > > > > > > >> > > > > > > > > >> > If we use LogAppendTime for log retention, the answer is > > > message > > > > > > will > > > > > > > >> stay > > > > > > > >> > in Kafka for retention time after the message is produced > > (to > > > be > > > > > > more > > > > > > > >> > precise, upper bounded by log.rolling.ms + > log.retention.ms > > ). > > > > > User > > > > > > > has a > > > > > > > >> > clear guarantee and they may decide whether or not to put > > the > > > > > > message > > > > > > > >> into > > > > > > > >> > Kafka. Or how to adjust the retention time according to > > their > > > > > > > >> requirements. > > > > > > > >> > If we use create time for log retention, the answer would > be > > > it > > > > > > > depends. > > > > > > > >> > The best answer we can give is at least retention.ms > > because > > > > > there > > > > > > > is no > > > > > > > >> > guarantee when the messages will be deleted after that. > If a > > > > > message > > > > > > > sits > > > > > > > >> > somewhere behind a larger create time, the message might > > stay > > > > > longer > > > > > > > than > > > > > > > >> > expected. But we don't know how longer it would be because > > it > > > > > > depends > > > > > > > on > > > > > > > >> > the create time. In this case, it is hard for user to > decide > > > > what > > > > > to > > > > > > > do. > > > > > > > >> > > > > > > > > >> > I am worrying about this because a blurring guarantee has > > > bitten > > > > > us > > > > > > > >> > before, e.g. Topic creation. We have received many > questions > > > > like > > > > > > > "why my > > > > > > > >> > topic is not there after I created it". I can imagine we > > > receive > > > > > > > similar > > > > > > > >> > question asking "why my message is still there after > > retention > > > > > time > > > > > > > has > > > > > > > >> > reached". So my understanding is that a clear and solid > > > > guarantee > > > > > is > > > > > > > >> better > > > > > > > >> > than having a mechanism that works in most cases but > > > > occasionally > > > > > > does > > > > > > > >> not > > > > > > > >> > work. > > > > > > > >> > > > > > > > > >> > If we think of the retention guarantee we provide with > > > > > > LogAppendTime, > > > > > > > it > > > > > > > >> > is not broken as you said, because we are telling user the > > log > > > > > > > retention > > > > > > > >> is > > > > > > > >> > NOT based on create time at the first place. > > > > > > > >> > > > > > > > > >> > WRT (3), no matter whether we index on LogAppendTime or > > > > > CreateTime, > > > > > > > the > > > > > > > >> > best guarantee we can provide with user is "not missing > > > message > > > > > > after > > > > > > > a > > > > > > > >> > certain timestamp". Therefore I actually really like to > > index > > > on > > > > > > > >> CreateTime > > > > > > > >> > because that is the timestamp we provide to user, and we > can > > > > have > > > > > > the > > > > > > > >> solid > > > > > > > >> > guarantee. > > > > > > > >> > On the other hand, indexing on LogAppendTime and giving > user > > > > > > > CreateTime > > > > > > > >> > does not provide solid guarantee when user do search based > > on > > > > > > > timestamp. > > > > > > > >> It > > > > > > > >> > only works when LogAppendTime is always no earlier than > > > > > CreateTime. > > > > > > > This > > > > > > > >> is > > > > > > > >> > a reasonable assumption and we can easily enforce it. > > > > > > > >> > > > > > > > > >> > With above, I am not sure if we can avoid server timestamp > > to > > > > make > > > > > > log > > > > > > > >> > retention work with a clear guarantee. For searching by > > > > timestamp > > > > > > use > > > > > > > >> case, > > > > > > > >> > I really want to have the index built on CreateTime. But > > with > > > a > > > > > > > >> reasonable > > > > > > > >> > assumption and timestamp enforcement, a LogAppendTime > index > > > > would > > > > > > also > > > > > > > >> work. > > > > > > > >> > > > > > > > > >> > Thanks, > > > > > > > >> > > > > > > > > >> > Jiangjie (Becket) Qin > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps < > > j...@confluent.io > > > > > > > > > > wrote: > > > > > > > >> > > > > > > > > >> >> Hey Becket, > > > > > > > >> >> > > > > > > > >> >> Let me see if I can address your concerns: > > > > > > > >> >> > > > > > > > >> >> 1. Let's say we have two source clusters that are > mirrored > > to > > > > the > > > > > > > same > > > > > > > >> >> > target cluster. For some reason one of the mirror maker > > > from > > > > a > > > > > > > cluster > > > > > > > >> >> dies > > > > > > > >> >> > and after fix the issue we want to resume mirroring. In > > > this > > > > > case > > > > > > > it > > > > > > > >> is > > > > > > > >> >> > possible that when the mirror maker resumes mirroring, > > the > > > > > > > timestamp > > > > > > > >> of > > > > > > > >> >> the > > > > > > > >> >> > messages have already gone beyond the acceptable > > timestamp > > > > > range > > > > > > on > > > > > > > >> >> broker. > > > > > > > >> >> > In order to let those messages go through, we have to > > bump > > > up > > > > > the > > > > > > > >> >> > *max.append.delay > > > > > > > >> >> > *for all the topics on the target broker. This could be > > > > > painful. > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> Actually what I was suggesting was different. Here is my > > > > > > observation: > > > > > > > >> >> clusters/topics directly produced to by applications > have a > > > > valid > > > > > > > >> >> assertion > > > > > > > >> >> that log append time and create time are similar (let's > > call > > > > > these > > > > > > > >> >> "unbuffered"); other cluster/topic such as those that > > receive > > > > > data > > > > > > > from > > > > > > > >> a > > > > > > > >> >> database, a log file, or another kafka cluster don't have > > > that > > > > > > > >> assertion, > > > > > > > >> >> for these "buffered" clusters data can be arbitrarily > late. > > > > This > > > > > > > means > > > > > > > >> any > > > > > > > >> >> use of log append time on these buffered clusters is not > > very > > > > > > > >> meaningful, > > > > > > > >> >> and create time and log append time "should" be similar > on > > > > > > unbuffered > > > > > > > >> >> clusters so you can probably use either. > > > > > > > >> >> > > > > > > > >> >> Using log append time on buffered clusters actually > results > > > in > > > > > bad > > > > > > > >> things. > > > > > > > >> >> If you request the offset for a given time you get don't > > end > > > up > > > > > > > getting > > > > > > > >> >> data for that time but rather data that showed up at that > > > time. > > > > > If > > > > > > > you > > > > > > > >> try > > > > > > > >> >> to retain 7 days of data it may mostly work but any kind > of > > > > > > > >> bootstrapping > > > > > > > >> >> will result in retaining much more (potentially the whole > > > > > database > > > > > > > >> >> contents!). > > > > > > > >> >> > > > > > > > >> >> So what I am suggesting in terms of the use of the > > > > > max.append.delay > > > > > > > is > > > > > > > >> >> that > > > > > > > >> >> unbuffered clusters would have this set and buffered > > clusters > > > > > would > > > > > > > not. > > > > > > > >> >> In > > > > > > > >> >> other words, in LI terminology, tracking and metrics > > clusters > > > > > would > > > > > > > have > > > > > > > >> >> this enforced, aggregate and replica clusters wouldn't. > > > > > > > >> >> > > > > > > > >> >> So you DO have the issue of potentially maintaining more > > data > > > > > than > > > > > > > you > > > > > > > >> >> need > > > > > > > >> >> to on aggregate clusters if your mirroring skews, but you > > > DON'T > > > > > > need > > > > > > > to > > > > > > > >> >> tweak the setting as you described. > > > > > > > >> >> > > > > > > > >> >> 2. Let's say in the above scenario we let the messages > in, > > at > > > > > that > > > > > > > point > > > > > > > >> >> > some log segments in the target cluster might have a > wide > > > > range > > > > > > of > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling > could > > > be > > > > > > tricky > > > > > > > >> >> because > > > > > > > >> >> > the first time index entry does not necessarily have > the > > > > > smallest > > > > > > > >> >> timestamp > > > > > > > >> >> > of all the messages in the log segment. Instead, it is > > the > > > > > > largest > > > > > > > >> >> > timestamp ever seen. We have to scan the entire log to > > find > > > > the > > > > > > > >> message > > > > > > > >> >> > with smallest offset to see if we should roll. > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> I think there are two uses for time-based log rolling: > > > > > > > >> >> 1. Making the offset lookup by timestamp work > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if it is > > > supposed > > > > > to > > > > > > > get > > > > > > > >> >> purged after 7 days > > > > > > > >> >> > > > > > > > >> >> But think about these two use cases. (1) is totally > > obviated > > > by > > > > > the > > > > > > > >> >> time=>offset index we are adding which yields much more > > > > granular > > > > > > > offset > > > > > > > >> >> lookups. (2) Is actually totally broken if you switch to > > > append > > > > > > time, > > > > > > > >> >> right? If you want to be sure for security/privacy > reasons > > > you > > > > > only > > > > > > > >> retain > > > > > > > >> >> 7 days of data then if the log append and create time > > diverge > > > > you > > > > > > > >> actually > > > > > > > >> >> violate this requirement. > > > > > > > >> >> > > > > > > > >> >> I think 95% of people care about (1) which is solved in > the > > > > > > proposal > > > > > > > and > > > > > > > >> >> (2) is actually broken today as well as in both > proposals. > > > > > > > >> >> > > > > > > > >> >> 3. Theoretically it is possible that an older log segment > > > > > contains > > > > > > > >> >> > timestamps that are older than all the messages in a > > newer > > > > log > > > > > > > >> segment. > > > > > > > >> >> It > > > > > > > >> >> > would be weird that we are supposed to delete the newer > > log > > > > > > segment > > > > > > > >> >> before > > > > > > > >> >> > we delete the older log segment. > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> The index timestamps would always be a lower bound (i.e. > > the > > > > > > maximum > > > > > > > at > > > > > > > >> >> that time) so I don't think that is possible. > > > > > > > >> >> > > > > > > > >> >> 4. In bootstrap case, if we reload the data to a Kafka > > > > cluster, > > > > > we > > > > > > > have > > > > > > > >> >> to > > > > > > > >> >> > make sure we configure the topic correctly before we > load > > > the > > > > > > data. > > > > > > > >> >> > Otherwise the message might either be rejected because > > the > > > > > > > timestamp > > > > > > > >> is > > > > > > > >> >> too > > > > > > > >> >> > old, or it might be deleted immediately because the > > > retention > > > > > > time > > > > > > > has > > > > > > > >> >> > reached. > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> See (1). > > > > > > > >> >> > > > > > > > >> >> -Jay > > > > > > > >> >> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin > > > > > > > <j...@linkedin.com.invalid > > > > > > > >> > > > > > > > > >> >> wrote: > > > > > > > >> >> > > > > > > > >> >> > Hey Jay and Guozhang, > > > > > > > >> >> > > > > > > > > >> >> > Thanks a lot for the reply. So if I understand > correctly, > > > > Jay's > > > > > > > >> proposal > > > > > > > >> >> > is: > > > > > > > >> >> > > > > > > > > >> >> > 1. Let client stamp the message create time. > > > > > > > >> >> > 2. Broker build index based on client-stamped message > > > create > > > > > > time. > > > > > > > >> >> > 3. Broker only takes message whose create time is > withing > > > > > current > > > > > > > time > > > > > > > >> >> > plus/minus T (T is a configuration *max.append.delay*, > > > could > > > > be > > > > > > > topic > > > > > > > >> >> level > > > > > > > >> >> > configuration), if the timestamp is out of this range, > > > broker > > > > > > > rejects > > > > > > > >> >> the > > > > > > > >> >> > message. > > > > > > > >> >> > 4. Because the create time of messages can be out of > > order, > > > > > when > > > > > > > >> broker > > > > > > > >> >> > builds the time based index it only provides the > > guarantee > > > > that > > > > > > if > > > > > > > a > > > > > > > >> >> > consumer starts consuming from the offset returned by > > > > searching > > > > > > by > > > > > > > >> >> > timestamp t, they will not miss any message created > after > > > t, > > > > > but > > > > > > > might > > > > > > > >> >> see > > > > > > > >> >> > some messages created before t. > > > > > > > >> >> > > > > > > > > >> >> > To build the time based index, every time when a broker > > > needs > > > > > to > > > > > > > >> insert > > > > > > > >> >> a > > > > > > > >> >> > new time index entry, the entry would be > > > > > > > {Largest_Timestamp_Ever_Seen > > > > > > > >> -> > > > > > > > >> >> > Current_Offset}. This basically means any timestamp > > larger > > > > than > > > > > > the > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after this offset > > > > because > > > > > > it > > > > > > > >> never > > > > > > > >> >> > saw them before. So we don't miss any message with > larger > > > > > > > timestamp. > > > > > > > >> >> > > > > > > > > >> >> > (@Guozhang, in this case, for log retention we only > need > > to > > > > > take > > > > > > a > > > > > > > >> look > > > > > > > >> >> at > > > > > > > >> >> > the last time index entry, because it must be the > largest > > > > > > timestamp > > > > > > > >> >> ever, > > > > > > > >> >> > if that timestamp is overdue, we can safely delete any > > log > > > > > > segment > > > > > > > >> >> before > > > > > > > >> >> > that. So we don't need to scan the log segment file for > > log > > > > > > > retention) > > > > > > > >> >> > > > > > > > > >> >> > I assume that we are still going to have the new > > > FetchRequest > > > > > to > > > > > > > allow > > > > > > > >> >> the > > > > > > > >> >> > time index replication for replicas. > > > > > > > >> >> > > > > > > > > >> >> > I think Jay's main point here is that we don't want to > > have > > > > two > > > > > > > >> >> timestamp > > > > > > > >> >> > concepts in Kafka, which I agree is a reasonable > concern. > > > > And I > > > > > > > also > > > > > > > >> >> agree > > > > > > > >> >> > that create time is more meaningful than LogAppendTime > > for > > > > > users. > > > > > > > But > > > > > > > >> I > > > > > > > >> >> am > > > > > > > >> >> > not sure if making everything base on Create Time would > > > work > > > > in > > > > > > all > > > > > > > >> >> cases. > > > > > > > >> >> > Here are my questions about this approach: > > > > > > > >> >> > > > > > > > > >> >> > 1. Let's say we have two source clusters that are > > mirrored > > > to > > > > > the > > > > > > > same > > > > > > > >> >> > target cluster. For some reason one of the mirror maker > > > from > > > > a > > > > > > > cluster > > > > > > > >> >> dies > > > > > > > >> >> > and after fix the issue we want to resume mirroring. In > > > this > > > > > case > > > > > > > it > > > > > > > >> is > > > > > > > >> >> > possible that when the mirror maker resumes mirroring, > > the > > > > > > > timestamp > > > > > > > >> of > > > > > > > >> >> the > > > > > > > >> >> > messages have already gone beyond the acceptable > > timestamp > > > > > range > > > > > > on > > > > > > > >> >> broker. > > > > > > > >> >> > In order to let those messages go through, we have to > > bump > > > up > > > > > the > > > > > > > >> >> > *max.append.delay > > > > > > > >> >> > *for all the topics on the target broker. This could be > > > > > painful. > > > > > > > >> >> > > > > > > > > >> >> > 2. Let's say in the above scenario we let the messages > > in, > > > at > > > > > > that > > > > > > > >> point > > > > > > > >> >> > some log segments in the target cluster might have a > wide > > > > range > > > > > > of > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling > could > > > be > > > > > > tricky > > > > > > > >> >> because > > > > > > > >> >> > the first time index entry does not necessarily have > the > > > > > smallest > > > > > > > >> >> timestamp > > > > > > > >> >> > of all the messages in the log segment. Instead, it is > > the > > > > > > largest > > > > > > > >> >> > timestamp ever seen. We have to scan the entire log to > > find > > > > the > > > > > > > >> message > > > > > > > >> >> > with smallest offset to see if we should roll. > > > > > > > >> >> > > > > > > > > >> >> > 3. Theoretically it is possible that an older log > segment > > > > > > contains > > > > > > > >> >> > timestamps that are older than all the messages in a > > newer > > > > log > > > > > > > >> segment. > > > > > > > >> >> It > > > > > > > >> >> > would be weird that we are supposed to delete the newer > > log > > > > > > segment > > > > > > > >> >> before > > > > > > > >> >> > we delete the older log segment. > > > > > > > >> >> > > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to a Kafka > > > > cluster, > > > > > > we > > > > > > > >> have > > > > > > > >> >> to > > > > > > > >> >> > make sure we configure the topic correctly before we > load > > > the > > > > > > data. > > > > > > > >> >> > Otherwise the message might either be rejected because > > the > > > > > > > timestamp > > > > > > > >> is > > > > > > > >> >> too > > > > > > > >> >> > old, or it might be deleted immediately because the > > > retention > > > > > > time > > > > > > > has > > > > > > > >> >> > reached. > > > > > > > >> >> > > > > > > > > >> >> > I am very concerned about the operational overhead and > > the > > > > > > > ambiguity > > > > > > > >> of > > > > > > > >> >> > guarantees we introduce if we purely rely on > CreateTime. > > > > > > > >> >> > > > > > > > > >> >> > It looks to me that the biggest issue of adopting > > > CreateTime > > > > > > > >> everywhere > > > > > > > >> >> is > > > > > > > >> >> > CreateTime can have big gaps. These gaps could be > caused > > by > > > > > > several > > > > > > > >> >> cases: > > > > > > > >> >> > [1]. Faulty clients > > > > > > > >> >> > [2]. Natural delays from different source > > > > > > > >> >> > [3]. Bootstrap > > > > > > > >> >> > [4]. Failure recovery > > > > > > > >> >> > > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps solve > [2] > > as > > > > > well > > > > > > > if we > > > > > > > >> >> are > > > > > > > >> >> > able to set a reasonable max.append.delay. But it does > > not > > > > seem > > > > > > > >> address > > > > > > > >> >> [3] > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are solvable > > > because > > > > > it > > > > > > > looks > > > > > > > >> >> the > > > > > > > >> >> > CreateTime gap is unavoidable in those two cases. > > > > > > > >> >> > > > > > > > > >> >> > Thanks, > > > > > > > >> >> > > > > > > > > >> >> > Jiangjie (Becket) Qin > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang < > > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > >> >> wrote: > > > > > > > >> >> > > > > > > > > >> >> > > Just to complete Jay's option, here is my > > understanding: > > > > > > > >> >> > > > > > > > > > >> >> > > 1. For log retention: if we want to remove data > before > > > time > > > > > t, > > > > > > we > > > > > > > >> look > > > > > > > >> >> > into > > > > > > > >> >> > > the index file of each segment and find the largest > > > > timestamp > > > > > > t' > > > > > > > < > > > > > > > >> t, > > > > > > > >> >> > find > > > > > > > >> >> > > the corresponding timestamp and start scanning to the > > end > > > > of > > > > > > the > > > > > > > >> >> segment, > > > > > > > >> >> > > if there is no entry with timestamp >= t, we can > delete > > > > this > > > > > > > >> segment; > > > > > > > >> >> if > > > > > > > >> >> > a > > > > > > > >> >> > > segment's index smallest timestamp is larger than t, > we > > > can > > > > > > skip > > > > > > > >> that > > > > > > > >> >> > > segment. > > > > > > > >> >> > > > > > > > > > >> >> > > 2. For log rolling: if we want to start a new segment > > > after > > > > > > time > > > > > > > t, > > > > > > > >> we > > > > > > > >> >> > look > > > > > > > >> >> > > into the active segment's index file, if the largest > > > > > timestamp > > > > > > is > > > > > > > >> >> > already > > > > > > > > >> >> > > t, we can roll a new segment immediately; if it is < > t, > > > we > > > > > read > > > > > > > its > > > > > > > >> >> > > corresponding offset and start scanning to the end of > > the > > > > > > > segment, > > > > > > > >> if > > > > > > > >> >> we > > > > > > > >> >> > > find a record whose timestamp > t, we can roll a new > > > > segment. > > > > > > > >> >> > > > > > > > > > >> >> > > For log rolling we only need to possibly scan a small > > > > portion > > > > > > the > > > > > > > >> >> active > > > > > > > >> >> > > segment, which should be fine; for log retention we > may > > > in > > > > > the > > > > > > > worst > > > > > > > >> >> case > > > > > > > >> >> > > end up scanning all segments, but in practice we may > > skip > > > > > most > > > > > > of > > > > > > > >> them > > > > > > > >> >> > > since their smallest timestamp in the index file is > > > larger > > > > > than > > > > > > > t. > > > > > > > >> >> > > > > > > > > > >> >> > > Guozhang > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps < > > > > > j...@confluent.io> > > > > > > > >> wrote: > > > > > > > >> >> > > > > > > > > > >> >> > > > I think it should be possible to index out-of-order > > > > > > timestamps. > > > > > > > >> The > > > > > > > >> >> > > > timestamp index would be similar to the offset > > index, a > > > > > > memory > > > > > > > >> >> mapped > > > > > > > >> >> > > file > > > > > > > >> >> > > > appended to as part of the log append, but would > have > > > the > > > > > > > format > > > > > > > >> >> > > > timestamp offset > > > > > > > >> >> > > > The timestamp entries would be monotonic and as > with > > > the > > > > > > offset > > > > > > > >> >> index > > > > > > > >> >> > > would > > > > > > > >> >> > > > be no more often then every 4k (or some > configurable > > > > > > threshold > > > > > > > to > > > > > > > >> >> keep > > > > > > > >> >> > > the > > > > > > > >> >> > > > index small--actually for timestamp it could > probably > > > be > > > > > much > > > > > > > more > > > > > > > >> >> > sparse > > > > > > > >> >> > > > than 4k). > > > > > > > >> >> > > > > > > > > > > >> >> > > > A search for a timestamp t yields an offset o > before > > > > which > > > > > no > > > > > > > >> prior > > > > > > > >> >> > > message > > > > > > > >> >> > > > has a timestamp >= t. In other words if you read > the > > > log > > > > > > > starting > > > > > > > >> >> with > > > > > > > >> >> > o > > > > > > > >> >> > > > you are guaranteed not to miss any messages > occurring > > > at > > > > t > > > > > or > > > > > > > >> later > > > > > > > >> >> > > though > > > > > > > >> >> > > > you may get many before t (due to > out-of-orderness). > > > > Unlike > > > > > > the > > > > > > > >> >> offset > > > > > > > >> >> > > > index this bound doesn't really have to be tight > > (i.e. > > > > > > > probably no > > > > > > > >> >> need > > > > > > > >> >> > > to > > > > > > > >> >> > > > go search the log itself, though you could). > > > > > > > >> >> > > > > > > > > > > >> >> > > > -Jay > > > > > > > >> >> > > > > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps < > > > > > > j...@confluent.io> > > > > > > > >> >> wrote: > > > > > > > >> >> > > > > > > > > > > >> >> > > > > Here's my basic take: > > > > > > > >> >> > > > > - I agree it would be nice to have a notion of > time > > > > baked > > > > > > in > > > > > > > if > > > > > > > >> it > > > > > > > >> >> > were > > > > > > > >> >> > > > > done right > > > > > > > >> >> > > > > - All the proposals so far seem pretty complex--I > > > think > > > > > > they > > > > > > > >> might > > > > > > > >> >> > make > > > > > > > >> >> > > > > things worse rather than better overall > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to the > message > > > is > > > > > > > probably > > > > > > > >> a > > > > > > > >> >> > > > > non-starter from a size perspective > > > > > > > >> >> > > > > - Even if it isn't in the message, having two > > notions > > > > of > > > > > > time > > > > > > > >> that > > > > > > > >> >> > > > control > > > > > > > >> >> > > > > different things is a bit confusing > > > > > > > >> >> > > > > - The mechanics of basing retention etc on log > > append > > > > > time > > > > > > > when > > > > > > > >> >> > that's > > > > > > > >> >> > > > not > > > > > > > >> >> > > > > in the log seem complicated > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > To that end here is a possible 4th option. Let me > > > know > > > > > what > > > > > > > you > > > > > > > >> >> > think. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > The basic idea is that the message creation time > is > > > > > closest > > > > > > > to > > > > > > > >> >> what > > > > > > > >> >> > the > > > > > > > >> >> > > > > user actually cares about but is dangerous if set > > > > wrong. > > > > > So > > > > > > > >> rather > > > > > > > >> >> > than > > > > > > > >> >> > > > > substitute another notion of time, let's try to > > > ensure > > > > > the > > > > > > > >> >> > correctness > > > > > > > >> >> > > of > > > > > > > >> >> > > > > message creation time by preventing arbitrarily > bad > > > > > message > > > > > > > >> >> creation > > > > > > > >> >> > > > times. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > First, let's see if we can agree that log append > > time > > > > is > > > > > > not > > > > > > > >> >> > something > > > > > > > >> >> > > > > anyone really cares about but rather an > > > implementation > > > > > > > detail. > > > > > > > >> The > > > > > > > >> >> > > > > timestamp that matters to the user is when the > > > message > > > > > > > occurred > > > > > > > >> >> (the > > > > > > > >> >> > > > > creation time). The log append time is basically > > just > > > > an > > > > > > > >> >> > approximation > > > > > > > >> >> > > to > > > > > > > >> >> > > > > this on the assumption that the message creation > > and > > > > the > > > > > > > message > > > > > > > >> >> > > receive > > > > > > > >> >> > > > on > > > > > > > >> >> > > > > the server occur pretty close together and the > > reason > > > > to > > > > > > > prefer > > > > > > > >> . > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > But as these values diverge the issue starts to > > > become > > > > > > > apparent. > > > > > > > >> >> Say > > > > > > > >> >> > > you > > > > > > > >> >> > > > > set the retention to one week and then mirror > data > > > > from a > > > > > > > topic > > > > > > > >> >> > > > containing > > > > > > > >> >> > > > > two years of retention. Your intention is clearly > > to > > > > keep > > > > > > the > > > > > > > >> last > > > > > > > >> >> > > week, > > > > > > > >> >> > > > > but because the mirroring is appending right now > > you > > > > will > > > > > > > keep > > > > > > > >> two > > > > > > > >> >> > > years. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > The reason we are liking log append time is > because > > > we > > > > > are > > > > > > > >> >> > > (justifiably) > > > > > > > >> >> > > > > concerned that in certain situations the creation > > > time > > > > > may > > > > > > > not > > > > > > > >> be > > > > > > > >> >> > > > > trustworthy. This same problem exists on the > > servers > > > > but > > > > > > > there > > > > > > > >> are > > > > > > > >> >> > > fewer > > > > > > > >> >> > > > > servers and they just run the kafka code so it is > > > less > > > > of > > > > > > an > > > > > > > >> >> issue. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > There are two possible ways to handle this: > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > 1. Just tell people to add size based > > retention. I > > > > > think > > > > > > > this > > > > > > > >> >> is > > > > > > > >> >> > not > > > > > > > >> >> > > > > entirely unreasonable, we're basically saying > we > > > > > retain > > > > > > > data > > > > > > > >> >> based > > > > > > > >> >> > > on > > > > > > > >> >> > > > the > > > > > > > >> >> > > > > timestamp you give us in the data. If you give > > us > > > > bad > > > > > > > data we > > > > > > > >> >> will > > > > > > > >> >> > > > retain > > > > > > > >> >> > > > > it for a bad amount of time. If you want to > > ensure > > > > we > > > > > > > don't > > > > > > > >> >> retain > > > > > > > >> >> > > > "too > > > > > > > >> >> > > > > much" data, define "too much" by setting a > > > > time-based > > > > > > > >> retention > > > > > > > >> >> > > > setting. > > > > > > > >> >> > > > > This is not entirely unreasonable but kind of > > > > suffers > > > > > > > from a > > > > > > > >> >> "one > > > > > > > >> >> > > bad > > > > > > > >> >> > > > > apple" problem in a very large environment. > > > > > > > >> >> > > > > 2. Prevent bad timestamps. In general we can't > > > say a > > > > > > > >> timestamp > > > > > > > >> >> is > > > > > > > >> >> > > bad. > > > > > > > >> >> > > > > However the definition we're implicitly using > is > > > > that > > > > > we > > > > > > > >> think > > > > > > > >> >> > there > > > > > > > >> >> > > > are a > > > > > > > >> >> > > > > set of topics/clusters where the creation > > > timestamp > > > > > > should > > > > > > > >> >> always > > > > > > > >> >> > be > > > > > > > >> >> > > > "very > > > > > > > >> >> > > > > close" to the log append timestamp. This is > true > > > for > > > > > > data > > > > > > > >> >> sources > > > > > > > >> >> > > > that have > > > > > > > >> >> > > > > no buffering capability (which at LinkedIn is > > very > > > > > > common, > > > > > > > >> but > > > > > > > >> >> is > > > > > > > >> >> > > > more rare > > > > > > > >> >> > > > > elsewhere). The solution in this case would be > > to > > > > > allow > > > > > > a > > > > > > > >> >> setting > > > > > > > >> >> > > > along the > > > > > > > >> >> > > > > lines of max.append.delay which checks the > > > creation > > > > > > > timestamp > > > > > > > >> >> > > against > > > > > > > >> >> > > > the > > > > > > > >> >> > > > > server time to look for too large a > divergence. > > > The > > > > > > > solution > > > > > > > >> >> would > > > > > > > >> >> > > > either > > > > > > > >> >> > > > > be to reject the message or to override it > with > > > the > > > > > > server > > > > > > > >> >> time. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > So in LI's environment you would configure the > > > clusters > > > > > > used > > > > > > > for > > > > > > > >> >> > > direct, > > > > > > > >> >> > > > > unbuffered, message production (e.g. tracking and > > > > metrics > > > > > > > local) > > > > > > > >> >> to > > > > > > > >> >> > > > enforce > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound (say 10 > > > mins), > > > > > and > > > > > > > all > > > > > > > >> >> other > > > > > > > >> >> > > > > clusters would just inherent these. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > The downside of this approach is requiring the > > > special > > > > > > > >> >> configuration. > > > > > > > >> >> > > > > However I think in 99% of environments this could > > be > > > > > > skipped > > > > > > > >> >> > entirely, > > > > > > > >> >> > > > it's > > > > > > > >> >> > > > > only when the ratio of clients to servers gets so > > > > massive > > > > > > > that > > > > > > > >> you > > > > > > > >> >> > need > > > > > > > >> >> > > > to > > > > > > > >> >> > > > > do this. The primary upside is that you have a > > single > > > > > > > >> >> authoritative > > > > > > > >> >> > > > notion > > > > > > > >> >> > > > > of time which is closest to what a user would > want > > > and > > > > is > > > > > > > stored > > > > > > > >> >> > > directly > > > > > > > >> >> > > > > in the message. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > I'm also assuming there is a workable approach > for > > > > > indexing > > > > > > > >> >> > > non-monotonic > > > > > > > >> >> > > > > timestamps, though I haven't actually worked that > > > out. > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > -Jay > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin > > > > > > > >> >> > <j...@linkedin.com.invalid > > > > > > > >> >> > > > > > > > > > > >> >> > > > > wrote: > > > > > > > >> >> > > > > > > > > > > > >> >> > > > >> Bumping up this thread although most of the > > > discussion > > > > > > were > > > > > > > on > > > > > > > >> >> the > > > > > > > >> >> > > > >> discussion thread of KIP-31 :) > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> I just updated the KIP page to add detailed > > solution > > > > for > > > > > > the > > > > > > > >> >> option > > > > > > > >> >> > > > >> (Option > > > > > > > >> >> > > > >> 3) that does not expose the LogAppendTime to > user. > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> The option has a minor change to the fetch > request > > > to > > > > > > allow > > > > > > > >> >> fetching > > > > > > > >> >> > > > time > > > > > > > >> >> > > > >> index entry as well. I kind of like this > solution > > > > > because > > > > > > > its > > > > > > > >> >> just > > > > > > > >> >> > > doing > > > > > > > >> >> > > > >> what we need without introducing other things. > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> It will be great to see what are the feedback. I > > can > > > > > > explain > > > > > > > >> more > > > > > > > >> >> > > during > > > > > > > >> >> > > > >> tomorrow's KIP hangout. > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> Thanks, > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin < > > > > > > > >> j...@linkedin.com > > > > > > > >> >> > > > > > > > > >> >> > > > wrote: > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > >> > Hi Jay, > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback on the > > > > timestamp > > > > > > > >> proposal > > > > > > > >> >> > that > > > > > > > >> >> > > > was > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31. Please see > > the > > > > > > replies > > > > > > > >> >> inline. > > > > > > > >> >> > > > >> > The main change I made compared with previous > > > > proposal > > > > > > is > > > > > > > to > > > > > > > >> >> add > > > > > > > >> >> > > both > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the message. > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps < > > > > > > > j...@confluent.io > > > > > > > >> > > > > > > > > >> >> > > wrote: > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > > Hey Beckett, > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP just > for > > > > > > > simplicity of > > > > > > > >> >> > > > >> discussion. > > > > > > > >> >> > > > >> > You > > > > > > > >> >> > > > >> > > can still implement them in one patch. I > think > > > > > > > otherwise it > > > > > > > >> >> will > > > > > > > >> >> > > be > > > > > > > >> >> > > > >> hard > > > > > > > >> >> > > > >> > to > > > > > > > >> >> > > > >> > > discuss/vote on them since if you like the > > > offset > > > > > > > proposal > > > > > > > >> >> but > > > > > > > >> >> > not > > > > > > > >> >> > > > the > > > > > > > >> >> > > > >> > time > > > > > > > >> >> > > > >> > > proposal what do you do? > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > Introducing a second notion of time into > Kafka > > > is > > > > a > > > > > > > pretty > > > > > > > >> >> > massive > > > > > > > >> >> > > > >> > > philosophical change so it kind of warrants > > it's > > > > own > > > > > > > KIP I > > > > > > > >> >> think > > > > > > > >> >> > > it > > > > > > > >> >> > > > >> > isn't > > > > > > > >> >> > > > >> > > just "Change message format". > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > WRT time I think one thing to clarify in the > > > > > proposal > > > > > > is > > > > > > > >> how > > > > > > > >> >> MM > > > > > > > >> >> > > will > > > > > > > >> >> > > > >> have > > > > > > > >> >> > > > >> > > access to set the timestamp? Presumably this > > > will > > > > > be a > > > > > > > new > > > > > > > >> >> field > > > > > > > >> >> > > in > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then any user > can > > > set > > > > > the > > > > > > > >> >> > timestamp, > > > > > > > >> >> > > > >> right? > > > > > > > >> >> > > > >> > > I'm not sure you answered the questions > around > > > how > > > > > > this > > > > > > > >> will > > > > > > > >> >> > work > > > > > > > >> >> > > > for > > > > > > > >> >> > > > >> MM > > > > > > > >> >> > > > >> > > since when MM retains timestamps from > multiple > > > > > > > partitions > > > > > > > >> >> they > > > > > > > >> >> > > will > > > > > > > >> >> > > > >> then > > > > > > > >> >> > > > >> > be > > > > > > > >> >> > > > >> > > out of order and in the past (so the > > > > > > > >> >> max(lastAppendedTimestamp, > > > > > > > >> >> > > > >> > > currentTimeMillis) override you proposed > will > > > not > > > > > > work, > > > > > > > >> >> right?). > > > > > > > >> >> > > If > > > > > > > >> >> > > > we > > > > > > > >> >> > > > >> > > don't do this then when you set up mirroring > > the > > > > > data > > > > > > > will > > > > > > > >> >> all > > > > > > > >> >> > be > > > > > > > >> >> > > > new > > > > > > > >> >> > > > >> and > > > > > > > >> >> > > > >> > > you have the same retention problem you > > > described. > > > > > > > Maybe I > > > > > > > >> >> > missed > > > > > > > >> >> > > > >> > > something...? > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the timestamp of > the > > > > > message > > > > > > > that > > > > > > > >> >> last > > > > > > > >> >> > > > >> > appended to the log. > > > > > > > >> >> > > > >> > If a broker is a leader, since it will assign > > the > > > > > > > timestamp > > > > > > > >> by > > > > > > > >> >> > > itself, > > > > > > > >> >> > > > >> the > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local clock > > when > > > > > append > > > > > > > the > > > > > > > >> >> last > > > > > > > >> >> > > > >> message. > > > > > > > >> >> > > > >> > So if there is no leader migration, > > > > > > > >> max(lastAppendedTimestamp, > > > > > > > >> >> > > > >> > currentTimeMillis) = currentTimeMillis. > > > > > > > >> >> > > > >> > If a broker is a follower, because it will > keep > > > the > > > > > > > leader's > > > > > > > >> >> > > timestamp > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would be the > > > > leader's > > > > > > > clock > > > > > > > >> >> when > > > > > > > >> >> > it > > > > > > > >> >> > > > >> appends > > > > > > > >> >> > > > >> > that message message. It keeps track of the > > > > > > > lastAppendedTime > > > > > > > >> >> only > > > > > > > >> >> > in > > > > > > > >> >> > > > >> case > > > > > > > >> >> > > > >> > it becomes leader later on. At that point, it > is > > > > > > possible > > > > > > > >> that > > > > > > > >> >> the > > > > > > > >> >> > > > >> > timestamp of the last appended message was > > stamped > > > > by > > > > > > old > > > > > > > >> >> leader, > > > > > > > >> >> > > but > > > > > > > >> >> > > > >> the > > > > > > > >> >> > > > >> > new leader's currentTimeMillis < > > lastAppendedTime. > > > > If > > > > > a > > > > > > > new > > > > > > > >> >> > message > > > > > > > >> >> > > > >> comes, > > > > > > > >> >> > > > >> > instead of stamp it with new leader's > > > > > currentTimeMillis, > > > > > > > we > > > > > > > >> >> have > > > > > > > >> >> > to > > > > > > > >> >> > > > >> stamp > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the timestamp > in > > > the > > > > > log > > > > > > > >> going > > > > > > > >> >> > > > backward. > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp, > > currentTimeMillis) > > > is > > > > > > > purely > > > > > > > >> >> based > > > > > > > >> >> > on > > > > > > > >> >> > > > the > > > > > > > >> >> > > > >> > broker side clock. If MM produces message with > > > > > different > > > > > > > >> >> > > LogAppendTime > > > > > > > >> >> > > > >> in > > > > > > > >> >> > > > >> > source clusters to the same target cluster, > the > > > > > > > LogAppendTime > > > > > > > >> >> will > > > > > > > >> >> > > be > > > > > > > >> >> > > > >> > ignored re-stamped by target cluster. > > > > > > > >> >> > > > >> > I added a use case example for mirror maker in > > > > KIP-32. > > > > > > > Also > > > > > > > >> >> there > > > > > > > >> >> > > is a > > > > > > > >> >> > > > >> > corner case discussion about when we need the > > > > > > > >> >> > max(lastAppendedTime, > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you take a > look > > > and > > > > > see > > > > > > if > > > > > > > >> that > > > > > > > >> >> > > > answers > > > > > > > >> >> > > > >> > your question? > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > My main motivation is that given that both > > Samza > > > > and > > > > > > > Kafka > > > > > > > >> >> > streams > > > > > > > >> >> > > > are > > > > > > > >> >> > > > >> > > doing work that implies a mandatory > > > client-defined > > > > > > > notion > > > > > > > >> of > > > > > > > >> >> > > time, I > > > > > > > >> >> > > > >> > really > > > > > > > >> >> > > > >> > > think introducing a different mandatory > notion > > > of > > > > > time > > > > > > > in > > > > > > > >> >> Kafka > > > > > > > >> >> > is > > > > > > > >> >> > > > >> going > > > > > > > >> >> > > > >> > to > > > > > > > >> >> > > > >> > > be quite odd. We should think hard about how > > > > > > > client-defined > > > > > > > >> >> time > > > > > > > >> >> > > > could > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but I'm also > not > > > > sure > > > > > > > that it > > > > > > > >> >> > can't. > > > > > > > >> >> > > > >> Having > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about this > with > > > > > > > Yi/Kartik on > > > > > > > >> >> the > > > > > > > >> >> > > > Samza > > > > > > > >> >> > > > >> > side? > > > > > > > >> >> > > > >> > I talked with Kartik and realized that it > would > > be > > > > > > useful > > > > > > > to > > > > > > > >> >> have > > > > > > > >> >> > a > > > > > > > >> >> > > > >> client > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like stream > > > > > > processing. > > > > > > > >> >> > > > >> > I was trying to figure out if we can simply > use > > > > client > > > > > > > >> >> timestamp > > > > > > > >> >> > > > without > > > > > > > >> >> > > > >> > introducing the server time. There are some > > > > discussion > > > > > > in > > > > > > > the > > > > > > > >> >> KIP. > > > > > > > >> >> > > > >> > The key problem we want to solve here is > > > > > > > >> >> > > > >> > 1. We want log retention and rolling to depend > > on > > > > > server > > > > > > > >> clock. > > > > > > > >> >> > > > >> > 2. We want to make sure the log-assiciated > > > timestamp > > > > > to > > > > > > be > > > > > > > >> >> > retained > > > > > > > >> >> > > > when > > > > > > > >> >> > > > >> > replicas moves. > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in some way > that > > > can > > > > > > allow > > > > > > > >> >> > searching > > > > > > > >> >> > > > by > > > > > > > >> >> > > > >> > timestamp. > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass the > > > > > > log-associated > > > > > > > >> >> > timestamp > > > > > > > >> >> > > > >> > through replication, that means we need to > have > > a > > > > > > > different > > > > > > > >> >> > protocol > > > > > > > >> >> > > > for > > > > > > > >> >> > > > >> > replica fetching to pass log-associated > > timestamp. > > > > It > > > > > is > > > > > > > >> >> actually > > > > > > > >> >> > > > >> > complicated and there could be a lot of corner > > > cases > > > > > to > > > > > > > >> handle. > > > > > > > >> >> > e.g. > > > > > > > >> >> > > > >> what > > > > > > > >> >> > > > >> > if an old leader started to fetch from the new > > > > leader, > > > > > > > should > > > > > > > >> >> it > > > > > > > >> >> > > also > > > > > > > >> >> > > > >> > update all of its old log segment timestamp? > > > > > > > >> >> > > > >> > I think actually client side timestamp would > be > > > > better > > > > > > > for 3 > > > > > > > >> >> if we > > > > > > > >> >> > > can > > > > > > > >> >> > > > >> > find a way to make it work. > > > > > > > >> >> > > > >> > So far I am not able to convince myself that > > only > > > > > having > > > > > > > >> client > > > > > > > >> >> > side > > > > > > > >> >> > > > >> > timestamp would work mainly because 1 and 2. > > There > > > > > are a > > > > > > > few > > > > > > > >> >> > > > situations > > > > > > > >> >> > > > >> I > > > > > > > >> >> > > > >> > mentioned in the KIP. > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > When you are saying it won't work you are > > > assuming > > > > > > some > > > > > > > >> >> > particular > > > > > > > >> >> > > > >> > > implementation? Maybe that the index is a > > > > > > monotonically > > > > > > > >> >> > increasing > > > > > > > >> >> > > > >> set of > > > > > > > >> >> > > > >> > > pointers to the least record with a > timestamp > > > > larger > > > > > > > than > > > > > > > >> the > > > > > > > >> >> > > index > > > > > > > >> >> > > > >> time? > > > > > > > >> >> > > > >> > > In other words a search for time X gives the > > > > largest > > > > > > > offset > > > > > > > >> >> at > > > > > > > >> >> > > which > > > > > > > >> >> > > > >> all > > > > > > > >> >> > > > >> > > records are <= X? > > > > > > > >> >> > > > >> > It is a promising idea. We probably can have > an > > > > > > in-memory > > > > > > > >> index > > > > > > > >> >> > like > > > > > > > >> >> > > > >> that, > > > > > > > >> >> > > > >> > but might be complicated to have a file on > disk > > > like > > > > > > that. > > > > > > > >> >> Imagine > > > > > > > >> >> > > > there > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see message Y > > > created > > > > > at > > > > > > T1 > > > > > > > >> and > > > > > > > >> >> > > created > > > > > > > >> >> > > > >> > index like [T1->Y], then we see message > created > > at > > > > T1, > > > > > > > >> >> supposedly > > > > > > > >> >> > we > > > > > > > >> >> > > > >> should > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], it is > easy > > to > > > > do > > > > > in > > > > > > > >> >> memory, > > > > > > > >> >> > but > > > > > > > >> >> > > > we > > > > > > > >> >> > > > >> > might have to rewrite the index file > completely. > > > > Maybe > > > > > > we > > > > > > > can > > > > > > > >> >> have > > > > > > > >> >> > > the > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and only > update > > > the > > > > > > first > > > > > > > >> >> pointer > > > > > > > >> >> > > for > > > > > > > >> >> > > > >> any > > > > > > > >> >> > > > >> > out of range timestamp, so the index will be > > > [0->X, > > > > > > > T1->Y]. > > > > > > > >> >> Also, > > > > > > > >> >> > > the > > > > > > > >> >> > > > >> range > > > > > > > >> >> > > > >> > of timestamps in the log segments can overlap > > with > > > > > each > > > > > > > >> other. > > > > > > > >> >> > That > > > > > > > >> >> > > > >> means > > > > > > > >> >> > > > >> > we either need to keep a cross segments index > > file > > > > or > > > > > we > > > > > > > need > > > > > > > >> >> to > > > > > > > >> >> > > check > > > > > > > >> >> > > > >> all > > > > > > > >> >> > > > >> > the index file for each log segment. > > > > > > > >> >> > > > >> > I separated out the time based log index to > > KIP-33 > > > > > > > because it > > > > > > > >> >> can > > > > > > > >> >> > be > > > > > > > >> >> > > > an > > > > > > > >> >> > > > >> > independent follow up feature as Neha > > suggested. I > > > > > will > > > > > > > try > > > > > > > >> to > > > > > > > >> >> > make > > > > > > > >> >> > > > the > > > > > > > >> >> > > > >> > time based index work with client side > > timestamp. > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > For retention, I agree with the problem you > > > point > > > > > out, > > > > > > > but > > > > > > > >> I > > > > > > > >> >> > think > > > > > > > >> >> > > > >> what > > > > > > > >> >> > > > >> > you > > > > > > > >> >> > > > >> > > are saying in that case is that you want a > > size > > > > > limit > > > > > > > too. > > > > > > > >> If > > > > > > > >> >> > you > > > > > > > >> >> > > > use > > > > > > > >> >> > > > >> > > system time you actually hit the same > problem: > > > say > > > > > you > > > > > > > do a > > > > > > > >> >> full > > > > > > > >> >> > > > dump > > > > > > > >> >> > > > >> of > > > > > > > >> >> > > > >> > a > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days retention, > > > your > > > > > > > retention > > > > > > > >> >> will > > > > > > > >> >> > > > >> actually > > > > > > > >> >> > > > >> > > not get enforced for the first 7 days > because > > > the > > > > > data > > > > > > > is > > > > > > > >> >> "new > > > > > > > >> >> > to > > > > > > > >> >> > > > >> Kafka". > > > > > > > >> >> > > > >> > I kind of think the size limit here is > > orthogonal. > > > > It > > > > > > is a > > > > > > > >> >> valid > > > > > > > >> >> > use > > > > > > > >> >> > > > >> case > > > > > > > >> >> > > > >> > where people only want to use time based > > retention > > > > > only. > > > > > > > In > > > > > > > >> >> your > > > > > > > >> >> > > > >> example, > > > > > > > >> >> > > > >> > depending on client timestamp might break the > > > > > > > functionality - > > > > > > > >> >> say > > > > > > > >> >> > it > > > > > > > >> >> > > > is > > > > > > > >> >> > > > >> a > > > > > > > >> >> > > > >> > bootstrap case people actually need to read > all > > > the > > > > > > data. > > > > > > > If > > > > > > > >> we > > > > > > > >> >> > > depend > > > > > > > >> >> > > > >> on > > > > > > > >> >> > > > >> > the client timestamp, the data might be > deleted > > > > > > instantly > > > > > > > >> when > > > > > > > >> >> > they > > > > > > > >> >> > > > >> come to > > > > > > > >> >> > > > >> > the broker. It might be too demanding to > expect > > > the > > > > > > > broker to > > > > > > > >> >> > > > understand > > > > > > > >> >> > > > >> > what people actually want to do with the data > > > coming > > > > > in. > > > > > > > So > > > > > > > >> the > > > > > > > >> >> > > > >> guarantee > > > > > > > >> >> > > > >> > of using server side timestamp is that "after > > > > appended > > > > > > to > > > > > > > the > > > > > > > >> >> log, > > > > > > > >> >> > > all > > > > > > > >> >> > > > >> > messages will be available on broker for > > retention > > > > > > time", > > > > > > > >> >> which is > > > > > > > >> >> > > not > > > > > > > >> >> > > > >> > changeable by clients. > > > > > > > >> >> > > > >> > > > > > > > > > >> >> > > > >> > > -Jay > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie > Qin < > > > > > > > >> >> j...@linkedin.com > > > > > > > >> >> > > > > > > > > > >> >> > > > >> wrote: > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> >> Hi folks, > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> This proposal was previously in KIP-31 and we > > > > > separated > > > > > > > it > > > > > > > >> to > > > > > > > >> >> > > KIP-32 > > > > > > > >> >> > > > >> per > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion. > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> The proposal is to add the following two > > > timestamps > > > > > to > > > > > > > Kafka > > > > > > > >> >> > > message. > > > > > > > >> >> > > > >> >> - CreateTime > > > > > > > >> >> > > > >> >> - LogAppendTime > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the producer > and > > > will > > > > > > > change > > > > > > > >> >> after > > > > > > > >> >> > > > that. > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by broker for > > > purpose > > > > > > such > > > > > > > as > > > > > > > >> >> > enforce > > > > > > > >> >> > > > log > > > > > > > >> >> > > > >> >> retention and log rolling. > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> Thanks, > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> >> > > > > > > > >> >> > > > >> > > > > > > > > >> >> > > > >> > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > -- > > > > > > > >> >> > > -- Guozhang > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang