My hesitation for the changed protocol is that I think If we will have topic configuration returned in the topic metadata, the current protocol makes more sense. Because the timestamp type is a topic level setting so we don't need to put it into each message. That is assuming the timestamp type change on a topic rarely happens and if it is ever needed, the existing data should be wiped out.
Thanks, Jiangjie (Becket) Qin On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin <becket....@gmail.com> wrote: > Bump up this thread per discussion on the KIP hangout. > > During the implementation of the KIP, Guozhang raised another proposal on > how to indicate the message timestamp type used by messages. So we want to > see people's opinion on this proposal. > > The difference between current and the new proposal only differs on > messages that are a) compressed, and b) using LogAppendTime > > For compressed messages using LogAppendTime, the timestamps in the current > proposal is as below: > 1. When a producer produces the messages, it tries to set timestamp to -1 > for inner messages if it knows LogAppendTime is used. > 2. When a broker receives the messages, it will overwrite the timestamp of > inner message to -1 if needed and write server time to the wrapper message. > Broker will do re-compression if inner message timestamp is overwritten. > 3. When a consumer receives the messages, it will see the inner message > timestamp is -1 so the wrapper message timestamp is used. > > Implementation wise, this proposal requires the producer to set timestamp > for inner messages correctly to avoid broker side re-compression. To do > that, the short term solution is to let producer infer the timestamp type > returned by broker in ProduceResponse and set correct timestamp afterwards. > This means the first few batches will still need re-compression on the > broker. The long term solution is to have producer get topic configuration > during metadata update. > > > The proposed modification is: > 1. When a producer produces the messages, it always using create time. > 2. When a broker receives the messages, it ignores the inner messages > timestamp, but simply set a wrapper message timestamp type attribute bit to > 1 and set the timestamp of the wrapper message to server time. (The broker > will also set the timesatmp type attribute bit accordingly for > non-compressed messages using LogAppendTime). > 3. When a consumer receives the messages, it checks timestamp type > attribute bit of wrapper message. If it is set to 1, the inner message's > timestamp will be ignored and the wrapper message's timestamp will be used. > > This approach uses an extra attribute bit. The good thing of the modified > protocol is consumers will be able to know the timestamp type. And > re-compression on broker side is completely avoided no matter what value is > sent by the producer. In this approach the inner messages will have wrong > timestamps. > > We want to see if people have concerns over the modified approach. > > Thanks, > > Jiangjie (Becket) Qin > > > > > > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin <becket....@gmail.com> wrote: > >> Jun, >> >> 1. I agree it would be nice to have the timestamps used in a unified way. >> My concern is that if we let server change timestamp of the inner message >> for LogAppendTime, that will enforce the user who are using LogAppendTime >> to always pay the recompression penalty. So using LogAppendTime makes >> KIP-31 in vain. >> >> 4. If there are no entries in the log segment, we can read from the time >> index before the previous log segment. If there is no previous entry >> avaliable after we search until the earliest log segment, that means all >> the previous log segment with a valid time index entry has been deleted. In >> that case supposedly there should be only one log segment left - the active >> log segment, we can simply set the latest timestamp to 0. >> >> Guozhang, >> >> Sorry for the confusion. by "the timestamp of the latest message" I >> actually meant "the timestamp of the message with largest timestamp". So in >> your example the "latest message" is 5. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >>> Jun, Jiangjie, >>> >>> I am confused about 3) here, if we use "the timestamp of the latest >>> message" >>> then doesn't this mean we will roll the log whenever a message delayed by >>> rolling time is received as well? Just to clarify, my understanding of >>> "the >>> timestamp of the latest message", for example in the following log, is 1, >>> not 5: >>> >>> 2, 3, 4, 5, 1 >>> >>> Guozhang >>> >>> >>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote: >>> >>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp >>> whether >>> > the messages are compressed or not. When >>> > message.timestamp.type=LogAppendTime, >>> > we will need to set timestamp in each message if messages are not >>> > compressed, so that the follower can get the same timestamp. So, it >>> seems >>> > that we should do the same thing for inner messages when messages are >>> > compressed. >>> > >>> > 4. I thought on startup, we restore the timestamp of the latest >>> message by >>> > reading from the time index of the last log segment. So, what happens >>> if >>> > there are no index entries? >>> > >>> > Thanks, >>> > >>> > Jun >>> > >>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com> >>> wrote: >>> > >>> > > Thanks for the explanation, Jun. >>> > > >>> > > 1. That makes sense. So maybe we can do the following: >>> > > (a) Set the timestamp in the compressed message to latest timestamp >>> of >>> > all >>> > > its inner messages. This works for both LogAppendTime and CreateTime. >>> > > (b) If message.timestamp.type=LogAppendTime, the broker will >>> overwrite >>> > all >>> > > the inner message timestamp to -1 if they are not set to -1. This is >>> > mainly >>> > > for topics that are using LogAppendTime. Hopefully the producer will >>> set >>> > > the timestamp to -1 in the ProducerRecord to avoid server side >>> > > recompression. >>> > > >>> > > 3. I see. That works. So the semantic of log rolling becomes "roll >>> out >>> > the >>> > > log segment if it has been inactive since the latest message has >>> > arrived." >>> > > >>> > > 4. Yes. If the largest timestamp is in previous log segment. The time >>> > index >>> > > for the current log segment does not have a valid offset in current >>> log >>> > > segment to point to. Maybe in that case we should build an empty log >>> > index. >>> > > >>> > > Thanks, >>> > > >>> > > Jiangjie (Becket) Qin >>> > > >>> > > >>> > > >>> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> wrote: >>> > > >>> > > > 1. I was thinking more about saving the decompression overhead in >>> the >>> > > > follower. Currently, the follower doesn't decompress the messages. >>> To >>> > > keep >>> > > > it that way, the outer message needs to include the timestamp of >>> the >>> > > latest >>> > > > inner message to build the time index in the follower. The simplest >>> > thing >>> > > > to do is to change the timestamp in the inner messages if >>> necessary, in >>> > > > which case there will be the recompression overhead. However, in >>> the >>> > case >>> > > > when the timestamp of the inner messages don't have to be changed >>> > > > (hopefully more common), there won't be the recompression >>> overhead. In >>> > > > either case, we always set the timestamp in the outer message to >>> be the >>> > > > timestamp of the latest inner message, in the leader. >>> > > > >>> > > > 3. Basically, in each log segment, we keep track of the timestamp >>> of >>> > the >>> > > > latest message. If current time - timestamp of latest message > log >>> > > rolling >>> > > > interval, we roll a new log segment. So, if messages with later >>> > > timestamps >>> > > > keep getting added, we only roll new log segments based on size. >>> On the >>> > > > other hand, if no new messages are added to a log, we can force a >>> log >>> > > roll >>> > > > based on time, which addresses the issue in (b). >>> > > > >>> > > > 4. Hmm, the index is per segment and should only point to >>> positions in >>> > > the >>> > > > corresponding .log file, not previous ones, right? >>> > > > >>> > > > Thanks, >>> > > > >>> > > > Jun >>> > > > >>> > > > >>> > > > >>> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <becket....@gmail.com> >>> > > wrote: >>> > > > >>> > > > > Hi Jun, >>> > > > > >>> > > > > Thanks a lot for the comments. Please see inline replies. >>> > > > > >>> > > > > Thanks, >>> > > > > >>> > > > > Jiangjie (Becket) Qin >>> > > > > >>> > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io> >>> wrote: >>> > > > > >>> > > > > > Hi, Becket, >>> > > > > > >>> > > > > > Thanks for the proposal. Looks good overall. A few comments >>> below. >>> > > > > > >>> > > > > > 1. KIP-32 didn't say what timestamp should be set in a >>> compressed >>> > > > > message. >>> > > > > > We probably should set it to the timestamp of the latest >>> messages >>> > > > > included >>> > > > > > in the compressed one. This way, during indexing, we don't >>> have to >>> > > > > > decompress the message. >>> > > > > > >>> > > > > That is a good point. >>> > > > > In normal cases, broker needs to decompress the message for >>> > > verification >>> > > > > purpose anyway. So building time index does not add additional >>> > > > > decompression. >>> > > > > During time index recovery, however, having a timestamp in >>> compressed >>> > > > > message might save the decompression. >>> > > > > >>> > > > > Another thing I am thinking is we should make sure KIP-32 works >>> well >>> > > with >>> > > > > KIP-31. i.e. we don't want to do recompression in order to add >>> > > timestamp >>> > > > to >>> > > > > messages. >>> > > > > Take the approach in my last email, the timestamp in the messages >>> > will >>> > > > > either all be overwritten by server if >>> > > > > message.timestamp.type=LogAppendTime, or they will not be >>> overwritten >>> > > if >>> > > > > message.timestamp.type=CreateTime. >>> > > > > >>> > > > > Maybe we can use the timestamp in compressed messages in the >>> > following >>> > > > way: >>> > > > > If message.timestamp.type=LogAppendTime, we have to overwrite >>> > > timestamps >>> > > > > for all the messages. We can simply write the timestamp in the >>> > > compressed >>> > > > > message to avoid recompression. >>> > > > > If message.timestamp.type=CreateTime, we do not need to >>> overwrite the >>> > > > > timestamps. We either reject the entire compressed message or We >>> just >>> > > > leave >>> > > > > the compressed message timestamp to be -1. >>> > > > > >>> > > > > So the semantic of the timestamp field in compressed message >>> field >>> > > > becomes: >>> > > > > if it is greater than 0, that means LogAppendTime is used, the >>> > > timestamp >>> > > > of >>> > > > > the inner messages is the compressed message LogAppendTime. If >>> it is >>> > > -1, >>> > > > > that means the CreateTime is used, the timestamp is in each >>> > individual >>> > > > > inner message. >>> > > > > >>> > > > > This sacrifice the speed of recovery but seems worthy because we >>> > avoid >>> > > > > recompression. >>> > > > > >>> > > > > >>> > > > > > 2. In KIP-33, should we make the time-based index interval >>> > > > configurable? >>> > > > > > Perhaps we can default it 60 secs, but allow users to >>> configure it >>> > to >>> > > > > > smaller values if they want more precision. >>> > > > > > >>> > > > > Yes, we can do that. >>> > > > > >>> > > > > >>> > > > > > 3. In KIP-33, I am not sure if log rolling should be based on >>> the >>> > > > > earliest >>> > > > > > message. This would mean that we will need to roll a log >>> segment >>> > > every >>> > > > > time >>> > > > > > we get a message delayed by the log rolling time interval. >>> Also, on >>> > > > > broker >>> > > > > > startup, we can get the timestamp of the latest message in a >>> log >>> > > > segment >>> > > > > > pretty efficiently by just looking at the last time index >>> entry. >>> > But >>> > > > > > getting the timestamp of the earliest timestamp requires a full >>> > scan >>> > > of >>> > > > > all >>> > > > > > log segments, which can be expensive. Previously, there were >>> two >>> > use >>> > > > > cases >>> > > > > > of time-based rolling: (a) more accurate time-based indexing >>> and >>> > (b) >>> > > > > > retaining data by time (since the active segment is never >>> deleted). >>> > > (a) >>> > > > > is >>> > > > > > already solved with a time-based index. For (b), if the >>> retention >>> > is >>> > > > > based >>> > > > > > on the timestamp of the latest message in a log segment, >>> perhaps >>> > log >>> > > > > > rolling should be based on that too. >>> > > > > > >>> > > > > I am not sure how to make log rolling work with the latest >>> timestamp >>> > in >>> > > > > current log segment. Do you mean the log rolling can based on the >>> > last >>> > > > log >>> > > > > segment's latest timestamp? If so how do we roll out the first >>> > segment? >>> > > > > >>> > > > > >>> > > > > > 4. In KIP-33, I presume the timestamp in the time index will be >>> > > > > > monotonically increasing. So, if all messages in a log segment >>> > have a >>> > > > > > timestamp less than the largest timestamp in the previous log >>> > > segment, >>> > > > we >>> > > > > > will use the latter to index this log segment? >>> > > > > > >>> > > > > Yes. The timestamps are monotonically increasing. If the largest >>> > > > timestamp >>> > > > > in the previous segment is very big, it is possible the time >>> index of >>> > > the >>> > > > > current segment only have two index entries (inserted during >>> segment >>> > > > > creation and roll out), both are pointing to a message in the >>> > previous >>> > > > log >>> > > > > segment. This is the corner case I mentioned before that we >>> should >>> > > expire >>> > > > > the next log segment even before expiring the previous log >>> segment >>> > just >>> > > > > because the largest timestamp is in previous log segment. In >>> current >>> > > > > approach, we will wait until the previous log segment expires, >>> and >>> > then >>> > > > > delete both the previous log segment and the next log segment. >>> > > > > >>> > > > > >>> > > > > > 5. In KIP-32, in the wire protocol, we mention both timestamp >>> and >>> > > time. >>> > > > > > They should be consistent. >>> > > > > > >>> > > > > Will fix the wiki page. >>> > > > > >>> > > > > >>> > > > > > Jun >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin < >>> becket....@gmail.com >>> > > >>> > > > > wrote: >>> > > > > > >>> > > > > > > Hey Jay, >>> > > > > > > >>> > > > > > > Thanks for the comments. >>> > > > > > > >>> > > > > > > Good point about the actions after when >>> > max.message.time.difference >>> > > > is >>> > > > > > > exceeded. Rejection is a useful behavior although I cannot >>> think >>> > of >>> > > > use >>> > > > > > > case at LinkedIn at this moment. I think it makes sense to >>> add a >>> > > > > > > configuration. >>> > > > > > > >>> > > > > > > How about the following configurations? >>> > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime >>> > > > > > > 2. max.message.time.difference.ms >>> > > > > > > >>> > > > > > > if message.timestamp.type is set to CreateTime, when the >>> broker >>> > > > > receives >>> > > > > > a >>> > > > > > > message, it will further check >>> max.message.time.difference.ms, >>> > and >>> > > > > will >>> > > > > > > reject the message it the time difference exceeds the >>> threshold. >>> > > > > > > If message.timestamp.type is set to LogAppendTime, the broker >>> > will >>> > > > > always >>> > > > > > > stamp the message with current server time, regardless the >>> value >>> > of >>> > > > > > > max.message.time.difference.ms >>> > > > > > > >>> > > > > > > This will make sure the message on the broker is either >>> > CreateTime >>> > > or >>> > > > > > > LogAppendTime, but not mixture of both. >>> > > > > > > >>> > > > > > > What do you think? >>> > > > > > > >>> > > > > > > Thanks, >>> > > > > > > >>> > > > > > > Jiangjie (Becket) Qin >>> > > > > > > >>> > > > > > > >>> > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <j...@confluent.io> >>> > > wrote: >>> > > > > > > >>> > > > > > > > Hey Becket, >>> > > > > > > > >>> > > > > > > > That summary of pros and cons sounds about right to me. >>> > > > > > > > >>> > > > > > > > There are potentially two actions you could take when >>> > > > > > > > max.message.time.difference is exceeded--override it or >>> reject >>> > > the >>> > > > > > > > message entirely. Can we pick one of these or does the >>> action >>> > > need >>> > > > to >>> > > > > > > > be configurable too? (I'm not sure). The downside of more >>> > > > > > > > configuration is that it is more fiddly and has more modes. >>> > > > > > > > >>> > > > > > > > I suppose the reason I was thinking of this as a >>> "difference" >>> > > > rather >>> > > > > > > > than a hard type was that if you were going to go the >>> reject >>> > mode >>> > > > you >>> > > > > > > > would need some tolerance setting (i.e. if your SLA is >>> that if >>> > > your >>> > > > > > > > timestamp is off by more than 10 minutes I give you an >>> error). >>> > I >>> > > > > agree >>> > > > > > > > with you that having one field that is potentially >>> containing a >>> > > mix >>> > > > > of >>> > > > > > > > two values is a bit weird. >>> > > > > > > > >>> > > > > > > > -Jay >>> > > > > > > > >>> > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin < >>> > becket....@gmail.com >>> > > > >>> > > > > > wrote: >>> > > > > > > > > It looks the format of the previous email was messed up. >>> Send >>> > > it >>> > > > > > again. >>> > > > > > > > > >>> > > > > > > > > Just to recap, the last proposal Jay made (with some >>> > > > implementation >>> > > > > > > > > details added) >>> > > > > > > > > was: >>> > > > > > > > > >>> > > > > > > > > 1. Allow user to stamp the message when produce >>> > > > > > > > > >>> > > > > > > > > 2. When broker receives a message it take a look at the >>> > > > difference >>> > > > > > > > between >>> > > > > > > > > its local time and the timestamp in the message. >>> > > > > > > > > a. If the time difference is within a configurable >>> > > > > > > > > max.message.time.difference.ms, the server will accept >>> it >>> > and >>> > > > > append >>> > > > > > > it >>> > > > > > > > to >>> > > > > > > > > the log. >>> > > > > > > > > b. If the time difference is beyond the configured >>> > > > > > > > > max.message.time.difference.ms, the server will >>> override the >>> > > > > > timestamp >>> > > > > > > > with >>> > > > > > > > > its current local time and append the message to the log. >>> > > > > > > > > c. The default value of max.message.time.difference >>> would >>> > be >>> > > > set >>> > > > > to >>> > > > > > > > > Long.MaxValue. >>> > > > > > > > > >>> > > > > > > > > 3. The configurable time difference threshold >>> > > > > > > > > max.message.time.difference.ms will >>> > > > > > > > > be a per topic configuration. >>> > > > > > > > > >>> > > > > > > > > 4. The indexed will be built so it has the following >>> > guarantee. >>> > > > > > > > > a. If user search by time stamp: >>> > > > > > > > > - all the messages after that timestamp will be >>> > consumed. >>> > > > > > > > > - user might see earlier messages. >>> > > > > > > > > b. The log retention will take a look at the last time >>> > index >>> > > > > entry >>> > > > > > in >>> > > > > > > > the >>> > > > > > > > > time index file. Because the last entry will be the >>> latest >>> > > > > timestamp >>> > > > > > in >>> > > > > > > > the >>> > > > > > > > > entire log segment. If that entry expires, the log >>> segment >>> > will >>> > > > be >>> > > > > > > > deleted. >>> > > > > > > > > c. The log rolling has to depend on the earliest >>> timestamp. >>> > > In >>> > > > > this >>> > > > > > > > case >>> > > > > > > > > we may need to keep a in memory timestamp only for the >>> > current >>> > > > > active >>> > > > > > > > log. >>> > > > > > > > > On recover, we will need to read the active log segment >>> to >>> > get >>> > > > this >>> > > > > > > > timestamp >>> > > > > > > > > of the earliest messages. >>> > > > > > > > > >>> > > > > > > > > 5. The downside of this proposal are: >>> > > > > > > > > a. The timestamp might not be monotonically increasing. >>> > > > > > > > > b. The log retention might become non-deterministic. >>> i.e. >>> > > When >>> > > > a >>> > > > > > > > message >>> > > > > > > > > will be deleted now depends on the timestamp of the other >>> > > > messages >>> > > > > in >>> > > > > > > the >>> > > > > > > > > same log segment. And those timestamps are provided by >>> > > > > > > > > user within a range depending on what the time difference >>> > > > threshold >>> > > > > > > > > configuration is. >>> > > > > > > > > c. The semantic meaning of the timestamp in the >>> messages >>> > > could >>> > > > > be a >>> > > > > > > > little >>> > > > > > > > > bit vague because some of them come from the producer and >>> > some >>> > > of >>> > > > > > them >>> > > > > > > > are >>> > > > > > > > > overwritten by brokers. >>> > > > > > > > > >>> > > > > > > > > 6. Although the proposal has some downsides, it gives >>> user >>> > the >>> > > > > > > > flexibility >>> > > > > > > > > to use the timestamp. >>> > > > > > > > > a. If the threshold is set to Long.MaxValue. The >>> timestamp >>> > in >>> > > > the >>> > > > > > > > message is >>> > > > > > > > > equivalent to CreateTime. >>> > > > > > > > > b. If the threshold is set to 0. The timestamp in the >>> > message >>> > > > is >>> > > > > > > > equivalent >>> > > > > > > > > to LogAppendTime. >>> > > > > > > > > >>> > > > > > > > > This proposal actually allows user to use either >>> CreateTime >>> > or >>> > > > > > > > LogAppendTime >>> > > > > > > > > without introducing two timestamp concept at the same >>> time. I >>> > > > have >>> > > > > > > > updated >>> > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal. >>> > > > > > > > > >>> > > > > > > > > One thing I am thinking is that instead of having a time >>> > > > difference >>> > > > > > > > threshold, >>> > > > > > > > > should we simply set have a TimestampType configuration? >>> > > Because >>> > > > in >>> > > > > > > most >>> > > > > > > > > cases, people will either set the threshold to 0 or >>> > > > Long.MaxValue. >>> > > > > > > > Setting >>> > > > > > > > > anything in between will make the timestamp in the >>> message >>> > > > > > meaningless >>> > > > > > > to >>> > > > > > > > > user - user don't know if the timestamp has been >>> overwritten >>> > by >>> > > > the >>> > > > > > > > brokers. >>> > > > > > > > > >>> > > > > > > > > Any thoughts? >>> > > > > > > > > >>> > > > > > > > > Thanks, >>> > > > > > > > > Jiangjie (Becket) Qin >>> > > > > > > > > >>> > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin >>> > > > > > > <j...@linkedin.com.invalid >>> > > > > > > > > >>> > > > > > > > > wrote: >>> > > > > > > > > >>> > > > > > > > >> Bump up this thread. >>> > > > > > > > >> >>> > > > > > > > >> Just to recap, the last proposal Jay made (with some >>> > > > > implementation >>> > > > > > > > details >>> > > > > > > > >> added) was: >>> > > > > > > > >> >>> > > > > > > > >> 1. Allow user to stamp the message when produce >>> > > > > > > > >> 2. When broker receives a message it take a look at >>> the >>> > > > > > difference >>> > > > > > > > >> between its local time and the timestamp in the >>> message. >>> > > > > > > > >> - If the time difference is within a configurable >>> > > > > > > > >> max.message.time.difference.ms, the server will >>> > accept >>> > > it >>> > > > > and >>> > > > > > > > append >>> > > > > > > > >> it to the log. >>> > > > > > > > >> - If the time difference is beyond the configured >>> > > > > > > > >> max.message.time.difference.ms, the server will >>> > > override >>> > > > > the >>> > > > > > > > >> timestamp with its current local time and append >>> the >>> > > > message >>> > > > > > to >>> > > > > > > > the >>> > > > > > > > >> log. >>> > > > > > > > >> - The default value of max.message.time.difference >>> > would >>> > > > be >>> > > > > > set >>> > > > > > > to >>> > > > > > > > >> Long.MaxValue. >>> > > > > > > > >> 3. The configurable time difference threshold >>> > > > > > > > >> max.message.time.difference.ms will be a per topic >>> > > > > > configuration. >>> > > > > > > > >> 4. The indexed will be built so it has the following >>> > > > guarantee. >>> > > > > > > > >> - If user search by time stamp: >>> > > > > > > > >> - all the messages after that timestamp will be >>> consumed. >>> > > > > > > > >> - user might see earlier messages. >>> > > > > > > > >> - The log retention will take a look at the last >>> time >>> > > > index >>> > > > > > > entry >>> > > > > > > > in >>> > > > > > > > >> the time index file. Because the last entry will >>> be >>> > the >>> > > > > latest >>> > > > > > > > >> timestamp in >>> > > > > > > > >> the entire log segment. If that entry expires, >>> the log >>> > > > > segment >>> > > > > > > > will >>> > > > > > > > >> be >>> > > > > > > > >> deleted. >>> > > > > > > > >> - The log rolling has to depend on the earliest >>> > > timestamp. >>> > > > > In >>> > > > > > > this >>> > > > > > > > >> case we may need to keep a in memory timestamp >>> only >>> > for >>> > > > the >>> > > > > > > > >> current active >>> > > > > > > > >> log. On recover, we will need to read the active >>> log >>> > > > segment >>> > > > > > to >>> > > > > > > > get >>> > > > > > > > >> this >>> > > > > > > > >> timestamp of the earliest messages. >>> > > > > > > > >> 5. The downside of this proposal are: >>> > > > > > > > >> - The timestamp might not be monotonically >>> increasing. >>> > > > > > > > >> - The log retention might become >>> non-deterministic. >>> > i.e. >>> > > > > When >>> > > > > > a >>> > > > > > > > >> message will be deleted now depends on the >>> timestamp >>> > of >>> > > > the >>> > > > > > > > >> other messages >>> > > > > > > > >> in the same log segment. And those timestamps are >>> > > provided >>> > > > > by >>> > > > > > > > >> user within a >>> > > > > > > > >> range depending on what the time difference >>> threshold >>> > > > > > > > configuration >>> > > > > > > > >> is. >>> > > > > > > > >> - The semantic meaning of the timestamp in the >>> > messages >>> > > > > could >>> > > > > > > be a >>> > > > > > > > >> little bit vague because some of them come from >>> the >>> > > > producer >>> > > > > > and >>> > > > > > > > >> some of >>> > > > > > > > >> them are overwritten by brokers. >>> > > > > > > > >> 6. Although the proposal has some downsides, it >>> gives >>> > > user >>> > > > > the >>> > > > > > > > >> flexibility to use the timestamp. >>> > > > > > > > >> - If the threshold is set to Long.MaxValue. The >>> timestamp >>> > > in >>> > > > > the >>> > > > > > > > message >>> > > > > > > > >> is equivalent to CreateTime. >>> > > > > > > > >> - If the threshold is set to 0. The timestamp in >>> the >>> > > > message >>> > > > > > is >>> > > > > > > > >> equivalent to LogAppendTime. >>> > > > > > > > >> >>> > > > > > > > >> This proposal actually allows user to use either >>> CreateTime >>> > or >>> > > > > > > > >> LogAppendTime without introducing two timestamp concept >>> at >>> > the >>> > > > > same >>> > > > > > > > time. I >>> > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with this >>> > > proposal. >>> > > > > > > > >> >>> > > > > > > > >> One thing I am thinking is that instead of having a time >>> > > > > difference >>> > > > > > > > >> threshold, should we simply set have a TimestampType >>> > > > > configuration? >>> > > > > > > > Because >>> > > > > > > > >> in most cases, people will either set the threshold to >>> 0 or >>> > > > > > > > Long.MaxValue. >>> > > > > > > > >> Setting anything in between will make the timestamp in >>> the >>> > > > message >>> > > > > > > > >> meaningless to user - user don't know if the timestamp >>> has >>> > > been >>> > > > > > > > overwritten >>> > > > > > > > >> by the brokers. >>> > > > > > > > >> >>> > > > > > > > >> Any thoughts? >>> > > > > > > > >> >>> > > > > > > > >> Thanks, >>> > > > > > > > >> Jiangjie (Becket) Qin >>> > > > > > > > >> >>> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin < >>> > > > j...@linkedin.com> >>> > > > > > > > wrote: >>> > > > > > > > >> >>> > > > > > > > >> > Hi Jay, >>> > > > > > > > >> > >>> > > > > > > > >> > Thanks for such detailed explanation. I think we both >>> are >>> > > > trying >>> > > > > > to >>> > > > > > > > make >>> > > > > > > > >> > CreateTime work for us if possible. To me by "work" it >>> > means >>> > > > > clear >>> > > > > > > > >> > guarantees on: >>> > > > > > > > >> > 1. Log Retention Time enforcement. >>> > > > > > > > >> > 2. Log Rolling time enforcement (This might be less a >>> > > concern >>> > > > as >>> > > > > > you >>> > > > > > > > >> > pointed out) >>> > > > > > > > >> > 3. Application search message by time. >>> > > > > > > > >> > >>> > > > > > > > >> > WRT (1), I agree the expectation for log retention >>> might >>> > be >>> > > > > > > different >>> > > > > > > > >> > depending on who we ask. But my concern is about the >>> level >>> > > of >>> > > > > > > > guarantee >>> > > > > > > > >> we >>> > > > > > > > >> > give to user. My observation is that a clear >>> guarantee to >>> > > user >>> > > > > is >>> > > > > > > > >> critical >>> > > > > > > > >> > regardless of the mechanism we choose. And this is the >>> > > subtle >>> > > > > but >>> > > > > > > > >> important >>> > > > > > > > >> > difference between using LogAppendTime and CreateTime. >>> > > > > > > > >> > >>> > > > > > > > >> > Let's say user asks this question: How long will my >>> > message >>> > > > stay >>> > > > > > in >>> > > > > > > > >> Kafka? >>> > > > > > > > >> > >>> > > > > > > > >> > If we use LogAppendTime for log retention, the answer >>> is >>> > > > message >>> > > > > > > will >>> > > > > > > > >> stay >>> > > > > > > > >> > in Kafka for retention time after the message is >>> produced >>> > > (to >>> > > > be >>> > > > > > > more >>> > > > > > > > >> > precise, upper bounded by log.rolling.ms + >>> > log.retention.ms >>> > > ). >>> > > > > > User >>> > > > > > > > has a >>> > > > > > > > >> > clear guarantee and they may decide whether or not to >>> put >>> > > the >>> > > > > > > message >>> > > > > > > > >> into >>> > > > > > > > >> > Kafka. Or how to adjust the retention time according >>> to >>> > > their >>> > > > > > > > >> requirements. >>> > > > > > > > >> > If we use create time for log retention, the answer >>> would >>> > be >>> > > > it >>> > > > > > > > depends. >>> > > > > > > > >> > The best answer we can give is at least retention.ms >>> > > because >>> > > > > > there >>> > > > > > > > is no >>> > > > > > > > >> > guarantee when the messages will be deleted after >>> that. >>> > If a >>> > > > > > message >>> > > > > > > > sits >>> > > > > > > > >> > somewhere behind a larger create time, the message >>> might >>> > > stay >>> > > > > > longer >>> > > > > > > > than >>> > > > > > > > >> > expected. But we don't know how longer it would be >>> because >>> > > it >>> > > > > > > depends >>> > > > > > > > on >>> > > > > > > > >> > the create time. In this case, it is hard for user to >>> > decide >>> > > > > what >>> > > > > > to >>> > > > > > > > do. >>> > > > > > > > >> > >>> > > > > > > > >> > I am worrying about this because a blurring guarantee >>> has >>> > > > bitten >>> > > > > > us >>> > > > > > > > >> > before, e.g. Topic creation. We have received many >>> > questions >>> > > > > like >>> > > > > > > > "why my >>> > > > > > > > >> > topic is not there after I created it". I can imagine >>> we >>> > > > receive >>> > > > > > > > similar >>> > > > > > > > >> > question asking "why my message is still there after >>> > > retention >>> > > > > > time >>> > > > > > > > has >>> > > > > > > > >> > reached". So my understanding is that a clear and >>> solid >>> > > > > guarantee >>> > > > > > is >>> > > > > > > > >> better >>> > > > > > > > >> > than having a mechanism that works in most cases but >>> > > > > occasionally >>> > > > > > > does >>> > > > > > > > >> not >>> > > > > > > > >> > work. >>> > > > > > > > >> > >>> > > > > > > > >> > If we think of the retention guarantee we provide with >>> > > > > > > LogAppendTime, >>> > > > > > > > it >>> > > > > > > > >> > is not broken as you said, because we are telling >>> user the >>> > > log >>> > > > > > > > retention >>> > > > > > > > >> is >>> > > > > > > > >> > NOT based on create time at the first place. >>> > > > > > > > >> > >>> > > > > > > > >> > WRT (3), no matter whether we index on LogAppendTime >>> or >>> > > > > > CreateTime, >>> > > > > > > > the >>> > > > > > > > >> > best guarantee we can provide with user is "not >>> missing >>> > > > message >>> > > > > > > after >>> > > > > > > > a >>> > > > > > > > >> > certain timestamp". Therefore I actually really like >>> to >>> > > index >>> > > > on >>> > > > > > > > >> CreateTime >>> > > > > > > > >> > because that is the timestamp we provide to user, and >>> we >>> > can >>> > > > > have >>> > > > > > > the >>> > > > > > > > >> solid >>> > > > > > > > >> > guarantee. >>> > > > > > > > >> > On the other hand, indexing on LogAppendTime and >>> giving >>> > user >>> > > > > > > > CreateTime >>> > > > > > > > >> > does not provide solid guarantee when user do search >>> based >>> > > on >>> > > > > > > > timestamp. >>> > > > > > > > >> It >>> > > > > > > > >> > only works when LogAppendTime is always no earlier >>> than >>> > > > > > CreateTime. >>> > > > > > > > This >>> > > > > > > > >> is >>> > > > > > > > >> > a reasonable assumption and we can easily enforce it. >>> > > > > > > > >> > >>> > > > > > > > >> > With above, I am not sure if we can avoid server >>> timestamp >>> > > to >>> > > > > make >>> > > > > > > log >>> > > > > > > > >> > retention work with a clear guarantee. For searching >>> by >>> > > > > timestamp >>> > > > > > > use >>> > > > > > > > >> case, >>> > > > > > > > >> > I really want to have the index built on CreateTime. >>> But >>> > > with >>> > > > a >>> > > > > > > > >> reasonable >>> > > > > > > > >> > assumption and timestamp enforcement, a LogAppendTime >>> > index >>> > > > > would >>> > > > > > > also >>> > > > > > > > >> work. >>> > > > > > > > >> > >>> > > > > > > > >> > Thanks, >>> > > > > > > > >> > >>> > > > > > > > >> > Jiangjie (Becket) Qin >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps < >>> > > j...@confluent.io >>> > > > > >>> > > > > > > wrote: >>> > > > > > > > >> > >>> > > > > > > > >> >> Hey Becket, >>> > > > > > > > >> >> >>> > > > > > > > >> >> Let me see if I can address your concerns: >>> > > > > > > > >> >> >>> > > > > > > > >> >> 1. Let's say we have two source clusters that are >>> > mirrored >>> > > to >>> > > > > the >>> > > > > > > > same >>> > > > > > > > >> >> > target cluster. For some reason one of the mirror >>> maker >>> > > > from >>> > > > > a >>> > > > > > > > cluster >>> > > > > > > > >> >> dies >>> > > > > > > > >> >> > and after fix the issue we want to resume >>> mirroring. In >>> > > > this >>> > > > > > case >>> > > > > > > > it >>> > > > > > > > >> is >>> > > > > > > > >> >> > possible that when the mirror maker resumes >>> mirroring, >>> > > the >>> > > > > > > > timestamp >>> > > > > > > > >> of >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > messages have already gone beyond the acceptable >>> > > timestamp >>> > > > > > range >>> > > > > > > on >>> > > > > > > > >> >> broker. >>> > > > > > > > >> >> > In order to let those messages go through, we have >>> to >>> > > bump >>> > > > up >>> > > > > > the >>> > > > > > > > >> >> > *max.append.delay >>> > > > > > > > >> >> > *for all the topics on the target broker. This >>> could be >>> > > > > > painful. >>> > > > > > > > >> >> >>> > > > > > > > >> >> >>> > > > > > > > >> >> Actually what I was suggesting was different. Here >>> is my >>> > > > > > > observation: >>> > > > > > > > >> >> clusters/topics directly produced to by applications >>> > have a >>> > > > > valid >>> > > > > > > > >> >> assertion >>> > > > > > > > >> >> that log append time and create time are similar >>> (let's >>> > > call >>> > > > > > these >>> > > > > > > > >> >> "unbuffered"); other cluster/topic such as those that >>> > > receive >>> > > > > > data >>> > > > > > > > from >>> > > > > > > > >> a >>> > > > > > > > >> >> database, a log file, or another kafka cluster don't >>> have >>> > > > that >>> > > > > > > > >> assertion, >>> > > > > > > > >> >> for these "buffered" clusters data can be arbitrarily >>> > late. >>> > > > > This >>> > > > > > > > means >>> > > > > > > > >> any >>> > > > > > > > >> >> use of log append time on these buffered clusters is >>> not >>> > > very >>> > > > > > > > >> meaningful, >>> > > > > > > > >> >> and create time and log append time "should" be >>> similar >>> > on >>> > > > > > > unbuffered >>> > > > > > > > >> >> clusters so you can probably use either. >>> > > > > > > > >> >> >>> > > > > > > > >> >> Using log append time on buffered clusters actually >>> > results >>> > > > in >>> > > > > > bad >>> > > > > > > > >> things. >>> > > > > > > > >> >> If you request the offset for a given time you get >>> don't >>> > > end >>> > > > up >>> > > > > > > > getting >>> > > > > > > > >> >> data for that time but rather data that showed up at >>> that >>> > > > time. >>> > > > > > If >>> > > > > > > > you >>> > > > > > > > >> try >>> > > > > > > > >> >> to retain 7 days of data it may mostly work but any >>> kind >>> > of >>> > > > > > > > >> bootstrapping >>> > > > > > > > >> >> will result in retaining much more (potentially the >>> whole >>> > > > > > database >>> > > > > > > > >> >> contents!). >>> > > > > > > > >> >> >>> > > > > > > > >> >> So what I am suggesting in terms of the use of the >>> > > > > > max.append.delay >>> > > > > > > > is >>> > > > > > > > >> >> that >>> > > > > > > > >> >> unbuffered clusters would have this set and buffered >>> > > clusters >>> > > > > > would >>> > > > > > > > not. >>> > > > > > > > >> >> In >>> > > > > > > > >> >> other words, in LI terminology, tracking and metrics >>> > > clusters >>> > > > > > would >>> > > > > > > > have >>> > > > > > > > >> >> this enforced, aggregate and replica clusters >>> wouldn't. >>> > > > > > > > >> >> >>> > > > > > > > >> >> So you DO have the issue of potentially maintaining >>> more >>> > > data >>> > > > > > than >>> > > > > > > > you >>> > > > > > > > >> >> need >>> > > > > > > > >> >> to on aggregate clusters if your mirroring skews, >>> but you >>> > > > DON'T >>> > > > > > > need >>> > > > > > > > to >>> > > > > > > > >> >> tweak the setting as you described. >>> > > > > > > > >> >> >>> > > > > > > > >> >> 2. Let's say in the above scenario we let the >>> messages >>> > in, >>> > > at >>> > > > > > that >>> > > > > > > > point >>> > > > > > > > >> >> > some log segments in the target cluster might have >>> a >>> > wide >>> > > > > range >>> > > > > > > of >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling >>> > could >>> > > > be >>> > > > > > > tricky >>> > > > > > > > >> >> because >>> > > > > > > > >> >> > the first time index entry does not necessarily >>> have >>> > the >>> > > > > > smallest >>> > > > > > > > >> >> timestamp >>> > > > > > > > >> >> > of all the messages in the log segment. Instead, >>> it is >>> > > the >>> > > > > > > largest >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire >>> log to >>> > > find >>> > > > > the >>> > > > > > > > >> message >>> > > > > > > > >> >> > with smallest offset to see if we should roll. >>> > > > > > > > >> >> >>> > > > > > > > >> >> >>> > > > > > > > >> >> I think there are two uses for time-based log >>> rolling: >>> > > > > > > > >> >> 1. Making the offset lookup by timestamp work >>> > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if it >>> is >>> > > > supposed >>> > > > > > to >>> > > > > > > > get >>> > > > > > > > >> >> purged after 7 days >>> > > > > > > > >> >> >>> > > > > > > > >> >> But think about these two use cases. (1) is totally >>> > > obviated >>> > > > by >>> > > > > > the >>> > > > > > > > >> >> time=>offset index we are adding which yields much >>> more >>> > > > > granular >>> > > > > > > > offset >>> > > > > > > > >> >> lookups. (2) Is actually totally broken if you >>> switch to >>> > > > append >>> > > > > > > time, >>> > > > > > > > >> >> right? If you want to be sure for security/privacy >>> > reasons >>> > > > you >>> > > > > > only >>> > > > > > > > >> retain >>> > > > > > > > >> >> 7 days of data then if the log append and create time >>> > > diverge >>> > > > > you >>> > > > > > > > >> actually >>> > > > > > > > >> >> violate this requirement. >>> > > > > > > > >> >> >>> > > > > > > > >> >> I think 95% of people care about (1) which is solved >>> in >>> > the >>> > > > > > > proposal >>> > > > > > > > and >>> > > > > > > > >> >> (2) is actually broken today as well as in both >>> > proposals. >>> > > > > > > > >> >> >>> > > > > > > > >> >> 3. Theoretically it is possible that an older log >>> segment >>> > > > > > contains >>> > > > > > > > >> >> > timestamps that are older than all the messages in >>> a >>> > > newer >>> > > > > log >>> > > > > > > > >> segment. >>> > > > > > > > >> >> It >>> > > > > > > > >> >> > would be weird that we are supposed to delete the >>> newer >>> > > log >>> > > > > > > segment >>> > > > > > > > >> >> before >>> > > > > > > > >> >> > we delete the older log segment. >>> > > > > > > > >> >> >>> > > > > > > > >> >> >>> > > > > > > > >> >> The index timestamps would always be a lower bound >>> (i.e. >>> > > the >>> > > > > > > maximum >>> > > > > > > > at >>> > > > > > > > >> >> that time) so I don't think that is possible. >>> > > > > > > > >> >> >>> > > > > > > > >> >> 4. In bootstrap case, if we reload the data to a >>> Kafka >>> > > > > cluster, >>> > > > > > we >>> > > > > > > > have >>> > > > > > > > >> >> to >>> > > > > > > > >> >> > make sure we configure the topic correctly before >>> we >>> > load >>> > > > the >>> > > > > > > data. >>> > > > > > > > >> >> > Otherwise the message might either be rejected >>> because >>> > > the >>> > > > > > > > timestamp >>> > > > > > > > >> is >>> > > > > > > > >> >> too >>> > > > > > > > >> >> > old, or it might be deleted immediately because the >>> > > > retention >>> > > > > > > time >>> > > > > > > > has >>> > > > > > > > >> >> > reached. >>> > > > > > > > >> >> >>> > > > > > > > >> >> >>> > > > > > > > >> >> See (1). >>> > > > > > > > >> >> >>> > > > > > > > >> >> -Jay >>> > > > > > > > >> >> >>> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin >>> > > > > > > > <j...@linkedin.com.invalid >>> > > > > > > > >> > >>> > > > > > > > >> >> wrote: >>> > > > > > > > >> >> >>> > > > > > > > >> >> > Hey Jay and Guozhang, >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > Thanks a lot for the reply. So if I understand >>> > correctly, >>> > > > > Jay's >>> > > > > > > > >> proposal >>> > > > > > > > >> >> > is: >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > 1. Let client stamp the message create time. >>> > > > > > > > >> >> > 2. Broker build index based on client-stamped >>> message >>> > > > create >>> > > > > > > time. >>> > > > > > > > >> >> > 3. Broker only takes message whose create time is >>> > withing >>> > > > > > current >>> > > > > > > > time >>> > > > > > > > >> >> > plus/minus T (T is a configuration >>> *max.append.delay*, >>> > > > could >>> > > > > be >>> > > > > > > > topic >>> > > > > > > > >> >> level >>> > > > > > > > >> >> > configuration), if the timestamp is out of this >>> range, >>> > > > broker >>> > > > > > > > rejects >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > message. >>> > > > > > > > >> >> > 4. Because the create time of messages can be out >>> of >>> > > order, >>> > > > > > when >>> > > > > > > > >> broker >>> > > > > > > > >> >> > builds the time based index it only provides the >>> > > guarantee >>> > > > > that >>> > > > > > > if >>> > > > > > > > a >>> > > > > > > > >> >> > consumer starts consuming from the offset returned >>> by >>> > > > > searching >>> > > > > > > by >>> > > > > > > > >> >> > timestamp t, they will not miss any message created >>> > after >>> > > > t, >>> > > > > > but >>> > > > > > > > might >>> > > > > > > > >> >> see >>> > > > > > > > >> >> > some messages created before t. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > To build the time based index, every time when a >>> broker >>> > > > needs >>> > > > > > to >>> > > > > > > > >> insert >>> > > > > > > > >> >> a >>> > > > > > > > >> >> > new time index entry, the entry would be >>> > > > > > > > {Largest_Timestamp_Ever_Seen >>> > > > > > > > >> -> >>> > > > > > > > >> >> > Current_Offset}. This basically means any timestamp >>> > > larger >>> > > > > than >>> > > > > > > the >>> > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after this >>> offset >>> > > > > because >>> > > > > > > it >>> > > > > > > > >> never >>> > > > > > > > >> >> > saw them before. So we don't miss any message with >>> > larger >>> > > > > > > > timestamp. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > (@Guozhang, in this case, for log retention we only >>> > need >>> > > to >>> > > > > > take >>> > > > > > > a >>> > > > > > > > >> look >>> > > > > > > > >> >> at >>> > > > > > > > >> >> > the last time index entry, because it must be the >>> > largest >>> > > > > > > timestamp >>> > > > > > > > >> >> ever, >>> > > > > > > > >> >> > if that timestamp is overdue, we can safely delete >>> any >>> > > log >>> > > > > > > segment >>> > > > > > > > >> >> before >>> > > > > > > > >> >> > that. So we don't need to scan the log segment >>> file for >>> > > log >>> > > > > > > > retention) >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > I assume that we are still going to have the new >>> > > > FetchRequest >>> > > > > > to >>> > > > > > > > allow >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > time index replication for replicas. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > I think Jay's main point here is that we don't >>> want to >>> > > have >>> > > > > two >>> > > > > > > > >> >> timestamp >>> > > > > > > > >> >> > concepts in Kafka, which I agree is a reasonable >>> > concern. >>> > > > > And I >>> > > > > > > > also >>> > > > > > > > >> >> agree >>> > > > > > > > >> >> > that create time is more meaningful than >>> LogAppendTime >>> > > for >>> > > > > > users. >>> > > > > > > > But >>> > > > > > > > >> I >>> > > > > > > > >> >> am >>> > > > > > > > >> >> > not sure if making everything base on Create Time >>> would >>> > > > work >>> > > > > in >>> > > > > > > all >>> > > > > > > > >> >> cases. >>> > > > > > > > >> >> > Here are my questions about this approach: >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > 1. Let's say we have two source clusters that are >>> > > mirrored >>> > > > to >>> > > > > > the >>> > > > > > > > same >>> > > > > > > > >> >> > target cluster. For some reason one of the mirror >>> maker >>> > > > from >>> > > > > a >>> > > > > > > > cluster >>> > > > > > > > >> >> dies >>> > > > > > > > >> >> > and after fix the issue we want to resume >>> mirroring. In >>> > > > this >>> > > > > > case >>> > > > > > > > it >>> > > > > > > > >> is >>> > > > > > > > >> >> > possible that when the mirror maker resumes >>> mirroring, >>> > > the >>> > > > > > > > timestamp >>> > > > > > > > >> of >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > messages have already gone beyond the acceptable >>> > > timestamp >>> > > > > > range >>> > > > > > > on >>> > > > > > > > >> >> broker. >>> > > > > > > > >> >> > In order to let those messages go through, we have >>> to >>> > > bump >>> > > > up >>> > > > > > the >>> > > > > > > > >> >> > *max.append.delay >>> > > > > > > > >> >> > *for all the topics on the target broker. This >>> could be >>> > > > > > painful. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > 2. Let's say in the above scenario we let the >>> messages >>> > > in, >>> > > > at >>> > > > > > > that >>> > > > > > > > >> point >>> > > > > > > > >> >> > some log segments in the target cluster might have >>> a >>> > wide >>> > > > > range >>> > > > > > > of >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling >>> > could >>> > > > be >>> > > > > > > tricky >>> > > > > > > > >> >> because >>> > > > > > > > >> >> > the first time index entry does not necessarily >>> have >>> > the >>> > > > > > smallest >>> > > > > > > > >> >> timestamp >>> > > > > > > > >> >> > of all the messages in the log segment. Instead, >>> it is >>> > > the >>> > > > > > > largest >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire >>> log to >>> > > find >>> > > > > the >>> > > > > > > > >> message >>> > > > > > > > >> >> > with smallest offset to see if we should roll. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > 3. Theoretically it is possible that an older log >>> > segment >>> > > > > > > contains >>> > > > > > > > >> >> > timestamps that are older than all the messages in >>> a >>> > > newer >>> > > > > log >>> > > > > > > > >> segment. >>> > > > > > > > >> >> It >>> > > > > > > > >> >> > would be weird that we are supposed to delete the >>> newer >>> > > log >>> > > > > > > segment >>> > > > > > > > >> >> before >>> > > > > > > > >> >> > we delete the older log segment. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to a >>> Kafka >>> > > > > cluster, >>> > > > > > > we >>> > > > > > > > >> have >>> > > > > > > > >> >> to >>> > > > > > > > >> >> > make sure we configure the topic correctly before >>> we >>> > load >>> > > > the >>> > > > > > > data. >>> > > > > > > > >> >> > Otherwise the message might either be rejected >>> because >>> > > the >>> > > > > > > > timestamp >>> > > > > > > > >> is >>> > > > > > > > >> >> too >>> > > > > > > > >> >> > old, or it might be deleted immediately because the >>> > > > retention >>> > > > > > > time >>> > > > > > > > has >>> > > > > > > > >> >> > reached. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > I am very concerned about the operational overhead >>> and >>> > > the >>> > > > > > > > ambiguity >>> > > > > > > > >> of >>> > > > > > > > >> >> > guarantees we introduce if we purely rely on >>> > CreateTime. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > It looks to me that the biggest issue of adopting >>> > > > CreateTime >>> > > > > > > > >> everywhere >>> > > > > > > > >> >> is >>> > > > > > > > >> >> > CreateTime can have big gaps. These gaps could be >>> > caused >>> > > by >>> > > > > > > several >>> > > > > > > > >> >> cases: >>> > > > > > > > >> >> > [1]. Faulty clients >>> > > > > > > > >> >> > [2]. Natural delays from different source >>> > > > > > > > >> >> > [3]. Bootstrap >>> > > > > > > > >> >> > [4]. Failure recovery >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps >>> solve >>> > [2] >>> > > as >>> > > > > > well >>> > > > > > > > if we >>> > > > > > > > >> >> are >>> > > > > > > > >> >> > able to set a reasonable max.append.delay. But it >>> does >>> > > not >>> > > > > seem >>> > > > > > > > >> address >>> > > > > > > > >> >> [3] >>> > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are >>> solvable >>> > > > because >>> > > > > > it >>> > > > > > > > looks >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > CreateTime gap is unavoidable in those two cases. >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > Thanks, >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > Jiangjie (Becket) Qin >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang < >>> > > > > > > wangg...@gmail.com >>> > > > > > > > > >>> > > > > > > > >> >> wrote: >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > > Just to complete Jay's option, here is my >>> > > understanding: >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > 1. For log retention: if we want to remove data >>> > before >>> > > > time >>> > > > > > t, >>> > > > > > > we >>> > > > > > > > >> look >>> > > > > > > > >> >> > into >>> > > > > > > > >> >> > > the index file of each segment and find the >>> largest >>> > > > > timestamp >>> > > > > > > t' >>> > > > > > > > < >>> > > > > > > > >> t, >>> > > > > > > > >> >> > find >>> > > > > > > > >> >> > > the corresponding timestamp and start scanning >>> to the >>> > > end >>> > > > > of >>> > > > > > > the >>> > > > > > > > >> >> segment, >>> > > > > > > > >> >> > > if there is no entry with timestamp >= t, we can >>> > delete >>> > > > > this >>> > > > > > > > >> segment; >>> > > > > > > > >> >> if >>> > > > > > > > >> >> > a >>> > > > > > > > >> >> > > segment's index smallest timestamp is larger >>> than t, >>> > we >>> > > > can >>> > > > > > > skip >>> > > > > > > > >> that >>> > > > > > > > >> >> > > segment. >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > 2. For log rolling: if we want to start a new >>> segment >>> > > > after >>> > > > > > > time >>> > > > > > > > t, >>> > > > > > > > >> we >>> > > > > > > > >> >> > look >>> > > > > > > > >> >> > > into the active segment's index file, if the >>> largest >>> > > > > > timestamp >>> > > > > > > is >>> > > > > > > > >> >> > already > >>> > > > > > > > >> >> > > t, we can roll a new segment immediately; if it >>> is < >>> > t, >>> > > > we >>> > > > > > read >>> > > > > > > > its >>> > > > > > > > >> >> > > corresponding offset and start scanning to the >>> end of >>> > > the >>> > > > > > > > segment, >>> > > > > > > > >> if >>> > > > > > > > >> >> we >>> > > > > > > > >> >> > > find a record whose timestamp > t, we can roll a >>> new >>> > > > > segment. >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > For log rolling we only need to possibly scan a >>> small >>> > > > > portion >>> > > > > > > the >>> > > > > > > > >> >> active >>> > > > > > > > >> >> > > segment, which should be fine; for log retention >>> we >>> > may >>> > > > in >>> > > > > > the >>> > > > > > > > worst >>> > > > > > > > >> >> case >>> > > > > > > > >> >> > > end up scanning all segments, but in practice we >>> may >>> > > skip >>> > > > > > most >>> > > > > > > of >>> > > > > > > > >> them >>> > > > > > > > >> >> > > since their smallest timestamp in the index file >>> is >>> > > > larger >>> > > > > > than >>> > > > > > > > t. >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > Guozhang >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps < >>> > > > > > j...@confluent.io> >>> > > > > > > > >> wrote: >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > > I think it should be possible to index >>> out-of-order >>> > > > > > > timestamps. >>> > > > > > > > >> The >>> > > > > > > > >> >> > > > timestamp index would be similar to the offset >>> > > index, a >>> > > > > > > memory >>> > > > > > > > >> >> mapped >>> > > > > > > > >> >> > > file >>> > > > > > > > >> >> > > > appended to as part of the log append, but >>> would >>> > have >>> > > > the >>> > > > > > > > format >>> > > > > > > > >> >> > > > timestamp offset >>> > > > > > > > >> >> > > > The timestamp entries would be monotonic and as >>> > with >>> > > > the >>> > > > > > > offset >>> > > > > > > > >> >> index >>> > > > > > > > >> >> > > would >>> > > > > > > > >> >> > > > be no more often then every 4k (or some >>> > configurable >>> > > > > > > threshold >>> > > > > > > > to >>> > > > > > > > >> >> keep >>> > > > > > > > >> >> > > the >>> > > > > > > > >> >> > > > index small--actually for timestamp it could >>> > probably >>> > > > be >>> > > > > > much >>> > > > > > > > more >>> > > > > > > > >> >> > sparse >>> > > > > > > > >> >> > > > than 4k). >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > A search for a timestamp t yields an offset o >>> > before >>> > > > > which >>> > > > > > no >>> > > > > > > > >> prior >>> > > > > > > > >> >> > > message >>> > > > > > > > >> >> > > > has a timestamp >= t. In other words if you >>> read >>> > the >>> > > > log >>> > > > > > > > starting >>> > > > > > > > >> >> with >>> > > > > > > > >> >> > o >>> > > > > > > > >> >> > > > you are guaranteed not to miss any messages >>> > occurring >>> > > > at >>> > > > > t >>> > > > > > or >>> > > > > > > > >> later >>> > > > > > > > >> >> > > though >>> > > > > > > > >> >> > > > you may get many before t (due to >>> > out-of-orderness). >>> > > > > Unlike >>> > > > > > > the >>> > > > > > > > >> >> offset >>> > > > > > > > >> >> > > > index this bound doesn't really have to be >>> tight >>> > > (i.e. >>> > > > > > > > probably no >>> > > > > > > > >> >> need >>> > > > > > > > >> >> > > to >>> > > > > > > > >> >> > > > go search the log itself, though you could). >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > -Jay >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps < >>> > > > > > > j...@confluent.io> >>> > > > > > > > >> >> wrote: >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > > Here's my basic take: >>> > > > > > > > >> >> > > > > - I agree it would be nice to have a notion >>> of >>> > time >>> > > > > baked >>> > > > > > > in >>> > > > > > > > if >>> > > > > > > > >> it >>> > > > > > > > >> >> > were >>> > > > > > > > >> >> > > > > done right >>> > > > > > > > >> >> > > > > - All the proposals so far seem pretty >>> complex--I >>> > > > think >>> > > > > > > they >>> > > > > > > > >> might >>> > > > > > > > >> >> > make >>> > > > > > > > >> >> > > > > things worse rather than better overall >>> > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to the >>> > message >>> > > > is >>> > > > > > > > probably >>> > > > > > > > >> a >>> > > > > > > > >> >> > > > > non-starter from a size perspective >>> > > > > > > > >> >> > > > > - Even if it isn't in the message, having two >>> > > notions >>> > > > > of >>> > > > > > > time >>> > > > > > > > >> that >>> > > > > > > > >> >> > > > control >>> > > > > > > > >> >> > > > > different things is a bit confusing >>> > > > > > > > >> >> > > > > - The mechanics of basing retention etc on >>> log >>> > > append >>> > > > > > time >>> > > > > > > > when >>> > > > > > > > >> >> > that's >>> > > > > > > > >> >> > > > not >>> > > > > > > > >> >> > > > > in the log seem complicated >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > To that end here is a possible 4th option. >>> Let me >>> > > > know >>> > > > > > what >>> > > > > > > > you >>> > > > > > > > >> >> > think. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > The basic idea is that the message creation >>> time >>> > is >>> > > > > > closest >>> > > > > > > > to >>> > > > > > > > >> >> what >>> > > > > > > > >> >> > the >>> > > > > > > > >> >> > > > > user actually cares about but is dangerous >>> if set >>> > > > > wrong. >>> > > > > > So >>> > > > > > > > >> rather >>> > > > > > > > >> >> > than >>> > > > > > > > >> >> > > > > substitute another notion of time, let's try >>> to >>> > > > ensure >>> > > > > > the >>> > > > > > > > >> >> > correctness >>> > > > > > > > >> >> > > of >>> > > > > > > > >> >> > > > > message creation time by preventing >>> arbitrarily >>> > bad >>> > > > > > message >>> > > > > > > > >> >> creation >>> > > > > > > > >> >> > > > times. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > First, let's see if we can agree that log >>> append >>> > > time >>> > > > > is >>> > > > > > > not >>> > > > > > > > >> >> > something >>> > > > > > > > >> >> > > > > anyone really cares about but rather an >>> > > > implementation >>> > > > > > > > detail. >>> > > > > > > > >> The >>> > > > > > > > >> >> > > > > timestamp that matters to the user is when >>> the >>> > > > message >>> > > > > > > > occurred >>> > > > > > > > >> >> (the >>> > > > > > > > >> >> > > > > creation time). The log append time is >>> basically >>> > > just >>> > > > > an >>> > > > > > > > >> >> > approximation >>> > > > > > > > >> >> > > to >>> > > > > > > > >> >> > > > > this on the assumption that the message >>> creation >>> > > and >>> > > > > the >>> > > > > > > > message >>> > > > > > > > >> >> > > receive >>> > > > > > > > >> >> > > > on >>> > > > > > > > >> >> > > > > the server occur pretty close together and >>> the >>> > > reason >>> > > > > to >>> > > > > > > > prefer >>> > > > > > > > >> . >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > But as these values diverge the issue starts >>> to >>> > > > become >>> > > > > > > > apparent. >>> > > > > > > > >> >> Say >>> > > > > > > > >> >> > > you >>> > > > > > > > >> >> > > > > set the retention to one week and then mirror >>> > data >>> > > > > from a >>> > > > > > > > topic >>> > > > > > > > >> >> > > > containing >>> > > > > > > > >> >> > > > > two years of retention. Your intention is >>> clearly >>> > > to >>> > > > > keep >>> > > > > > > the >>> > > > > > > > >> last >>> > > > > > > > >> >> > > week, >>> > > > > > > > >> >> > > > > but because the mirroring is appending right >>> now >>> > > you >>> > > > > will >>> > > > > > > > keep >>> > > > > > > > >> two >>> > > > > > > > >> >> > > years. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > The reason we are liking log append time is >>> > because >>> > > > we >>> > > > > > are >>> > > > > > > > >> >> > > (justifiably) >>> > > > > > > > >> >> > > > > concerned that in certain situations the >>> creation >>> > > > time >>> > > > > > may >>> > > > > > > > not >>> > > > > > > > >> be >>> > > > > > > > >> >> > > > > trustworthy. This same problem exists on the >>> > > servers >>> > > > > but >>> > > > > > > > there >>> > > > > > > > >> are >>> > > > > > > > >> >> > > fewer >>> > > > > > > > >> >> > > > > servers and they just run the kafka code so >>> it is >>> > > > less >>> > > > > of >>> > > > > > > an >>> > > > > > > > >> >> issue. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > There are two possible ways to handle this: >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > 1. Just tell people to add size based >>> > > retention. I >>> > > > > > think >>> > > > > > > > this >>> > > > > > > > >> >> is >>> > > > > > > > >> >> > not >>> > > > > > > > >> >> > > > > entirely unreasonable, we're basically >>> saying >>> > we >>> > > > > > retain >>> > > > > > > > data >>> > > > > > > > >> >> based >>> > > > > > > > >> >> > > on >>> > > > > > > > >> >> > > > the >>> > > > > > > > >> >> > > > > timestamp you give us in the data. If you >>> give >>> > > us >>> > > > > bad >>> > > > > > > > data we >>> > > > > > > > >> >> will >>> > > > > > > > >> >> > > > retain >>> > > > > > > > >> >> > > > > it for a bad amount of time. If you want >>> to >>> > > ensure >>> > > > > we >>> > > > > > > > don't >>> > > > > > > > >> >> retain >>> > > > > > > > >> >> > > > "too >>> > > > > > > > >> >> > > > > much" data, define "too much" by setting a >>> > > > > time-based >>> > > > > > > > >> retention >>> > > > > > > > >> >> > > > setting. >>> > > > > > > > >> >> > > > > This is not entirely unreasonable but >>> kind of >>> > > > > suffers >>> > > > > > > > from a >>> > > > > > > > >> >> "one >>> > > > > > > > >> >> > > bad >>> > > > > > > > >> >> > > > > apple" problem in a very large >>> environment. >>> > > > > > > > >> >> > > > > 2. Prevent bad timestamps. In general we >>> can't >>> > > > say a >>> > > > > > > > >> timestamp >>> > > > > > > > >> >> is >>> > > > > > > > >> >> > > bad. >>> > > > > > > > >> >> > > > > However the definition we're implicitly >>> using >>> > is >>> > > > > that >>> > > > > > we >>> > > > > > > > >> think >>> > > > > > > > >> >> > there >>> > > > > > > > >> >> > > > are a >>> > > > > > > > >> >> > > > > set of topics/clusters where the creation >>> > > > timestamp >>> > > > > > > should >>> > > > > > > > >> >> always >>> > > > > > > > >> >> > be >>> > > > > > > > >> >> > > > "very >>> > > > > > > > >> >> > > > > close" to the log append timestamp. This >>> is >>> > true >>> > > > for >>> > > > > > > data >>> > > > > > > > >> >> sources >>> > > > > > > > >> >> > > > that have >>> > > > > > > > >> >> > > > > no buffering capability (which at >>> LinkedIn is >>> > > very >>> > > > > > > common, >>> > > > > > > > >> but >>> > > > > > > > >> >> is >>> > > > > > > > >> >> > > > more rare >>> > > > > > > > >> >> > > > > elsewhere). The solution in this case >>> would be >>> > > to >>> > > > > > allow >>> > > > > > > a >>> > > > > > > > >> >> setting >>> > > > > > > > >> >> > > > along the >>> > > > > > > > >> >> > > > > lines of max.append.delay which checks the >>> > > > creation >>> > > > > > > > timestamp >>> > > > > > > > >> >> > > against >>> > > > > > > > >> >> > > > the >>> > > > > > > > >> >> > > > > server time to look for too large a >>> > divergence. >>> > > > The >>> > > > > > > > solution >>> > > > > > > > >> >> would >>> > > > > > > > >> >> > > > either >>> > > > > > > > >> >> > > > > be to reject the message or to override it >>> > with >>> > > > the >>> > > > > > > server >>> > > > > > > > >> >> time. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > So in LI's environment you would configure >>> the >>> > > > clusters >>> > > > > > > used >>> > > > > > > > for >>> > > > > > > > >> >> > > direct, >>> > > > > > > > >> >> > > > > unbuffered, message production (e.g. >>> tracking and >>> > > > > metrics >>> > > > > > > > local) >>> > > > > > > > >> >> to >>> > > > > > > > >> >> > > > enforce >>> > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound (say >>> 10 >>> > > > mins), >>> > > > > > and >>> > > > > > > > all >>> > > > > > > > >> >> other >>> > > > > > > > >> >> > > > > clusters would just inherent these. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > The downside of this approach is requiring >>> the >>> > > > special >>> > > > > > > > >> >> configuration. >>> > > > > > > > >> >> > > > > However I think in 99% of environments this >>> could >>> > > be >>> > > > > > > skipped >>> > > > > > > > >> >> > entirely, >>> > > > > > > > >> >> > > > it's >>> > > > > > > > >> >> > > > > only when the ratio of clients to servers >>> gets so >>> > > > > massive >>> > > > > > > > that >>> > > > > > > > >> you >>> > > > > > > > >> >> > need >>> > > > > > > > >> >> > > > to >>> > > > > > > > >> >> > > > > do this. The primary upside is that you have >>> a >>> > > single >>> > > > > > > > >> >> authoritative >>> > > > > > > > >> >> > > > notion >>> > > > > > > > >> >> > > > > of time which is closest to what a user would >>> > want >>> > > > and >>> > > > > is >>> > > > > > > > stored >>> > > > > > > > >> >> > > directly >>> > > > > > > > >> >> > > > > in the message. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > I'm also assuming there is a workable >>> approach >>> > for >>> > > > > > indexing >>> > > > > > > > >> >> > > non-monotonic >>> > > > > > > > >> >> > > > > timestamps, though I haven't actually worked >>> that >>> > > > out. >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > -Jay >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin >>> > > > > > > > >> >> > <j...@linkedin.com.invalid >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > > wrote: >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > >> Bumping up this thread although most of the >>> > > > discussion >>> > > > > > > were >>> > > > > > > > on >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > > > >> discussion thread of KIP-31 :) >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> I just updated the KIP page to add detailed >>> > > solution >>> > > > > for >>> > > > > > > the >>> > > > > > > > >> >> option >>> > > > > > > > >> >> > > > >> (Option >>> > > > > > > > >> >> > > > >> 3) that does not expose the LogAppendTime to >>> > user. >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > >>> > > > > > > > >> >> >>> > > > > > > > >> >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> The option has a minor change to the fetch >>> > request >>> > > > to >>> > > > > > > allow >>> > > > > > > > >> >> fetching >>> > > > > > > > >> >> > > > time >>> > > > > > > > >> >> > > > >> index entry as well. I kind of like this >>> > solution >>> > > > > > because >>> > > > > > > > its >>> > > > > > > > >> >> just >>> > > > > > > > >> >> > > doing >>> > > > > > > > >> >> > > > >> what we need without introducing other >>> things. >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> It will be great to see what are the >>> feedback. I >>> > > can >>> > > > > > > explain >>> > > > > > > > >> more >>> > > > > > > > >> >> > > during >>> > > > > > > > >> >> > > > >> tomorrow's KIP hangout. >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> Thanks, >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie >>> Qin < >>> > > > > > > > >> j...@linkedin.com >>> > > > > > > > >> >> > >>> > > > > > > > >> >> > > > wrote: >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > >> > Hi Jay, >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback on >>> the >>> > > > > timestamp >>> > > > > > > > >> proposal >>> > > > > > > > >> >> > that >>> > > > > > > > >> >> > > > was >>> > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31. >>> Please see >>> > > the >>> > > > > > > replies >>> > > > > > > > >> >> inline. >>> > > > > > > > >> >> > > > >> > The main change I made compared with >>> previous >>> > > > > proposal >>> > > > > > > is >>> > > > > > > > to >>> > > > > > > > >> >> add >>> > > > > > > > >> >> > > both >>> > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the >>> message. >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay >>> Kreps < >>> > > > > > > > j...@confluent.io >>> > > > > > > > >> > >>> > > > > > > > >> >> > > wrote: >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> > > Hey Beckett, >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP >>> just >>> > for >>> > > > > > > > simplicity of >>> > > > > > > > >> >> > > > >> discussion. >>> > > > > > > > >> >> > > > >> > You >>> > > > > > > > >> >> > > > >> > > can still implement them in one patch. I >>> > think >>> > > > > > > > otherwise it >>> > > > > > > > >> >> will >>> > > > > > > > >> >> > > be >>> > > > > > > > >> >> > > > >> hard >>> > > > > > > > >> >> > > > >> > to >>> > > > > > > > >> >> > > > >> > > discuss/vote on them since if you like >>> the >>> > > > offset >>> > > > > > > > proposal >>> > > > > > > > >> >> but >>> > > > > > > > >> >> > not >>> > > > > > > > >> >> > > > the >>> > > > > > > > >> >> > > > >> > time >>> > > > > > > > >> >> > > > >> > > proposal what do you do? >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > Introducing a second notion of time into >>> > Kafka >>> > > > is >>> > > > > a >>> > > > > > > > pretty >>> > > > > > > > >> >> > massive >>> > > > > > > > >> >> > > > >> > > philosophical change so it kind of >>> warrants >>> > > it's >>> > > > > own >>> > > > > > > > KIP I >>> > > > > > > > >> >> think >>> > > > > > > > >> >> > > it >>> > > > > > > > >> >> > > > >> > isn't >>> > > > > > > > >> >> > > > >> > > just "Change message format". >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > WRT time I think one thing to clarify >>> in the >>> > > > > > proposal >>> > > > > > > is >>> > > > > > > > >> how >>> > > > > > > > >> >> MM >>> > > > > > > > >> >> > > will >>> > > > > > > > >> >> > > > >> have >>> > > > > > > > >> >> > > > >> > > access to set the timestamp? Presumably >>> this >>> > > > will >>> > > > > > be a >>> > > > > > > > new >>> > > > > > > > >> >> field >>> > > > > > > > >> >> > > in >>> > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then any >>> user >>> > can >>> > > > set >>> > > > > > the >>> > > > > > > > >> >> > timestamp, >>> > > > > > > > >> >> > > > >> right? >>> > > > > > > > >> >> > > > >> > > I'm not sure you answered the questions >>> > around >>> > > > how >>> > > > > > > this >>> > > > > > > > >> will >>> > > > > > > > >> >> > work >>> > > > > > > > >> >> > > > for >>> > > > > > > > >> >> > > > >> MM >>> > > > > > > > >> >> > > > >> > > since when MM retains timestamps from >>> > multiple >>> > > > > > > > partitions >>> > > > > > > > >> >> they >>> > > > > > > > >> >> > > will >>> > > > > > > > >> >> > > > >> then >>> > > > > > > > >> >> > > > >> > be >>> > > > > > > > >> >> > > > >> > > out of order and in the past (so the >>> > > > > > > > >> >> max(lastAppendedTimestamp, >>> > > > > > > > >> >> > > > >> > > currentTimeMillis) override you proposed >>> > will >>> > > > not >>> > > > > > > work, >>> > > > > > > > >> >> right?). >>> > > > > > > > >> >> > > If >>> > > > > > > > >> >> > > > we >>> > > > > > > > >> >> > > > >> > > don't do this then when you set up >>> mirroring >>> > > the >>> > > > > > data >>> > > > > > > > will >>> > > > > > > > >> >> all >>> > > > > > > > >> >> > be >>> > > > > > > > >> >> > > > new >>> > > > > > > > >> >> > > > >> and >>> > > > > > > > >> >> > > > >> > > you have the same retention problem you >>> > > > described. >>> > > > > > > > Maybe I >>> > > > > > > > >> >> > missed >>> > > > > > > > >> >> > > > >> > > something...? >>> > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the timestamp >>> of >>> > the >>> > > > > > message >>> > > > > > > > that >>> > > > > > > > >> >> last >>> > > > > > > > >> >> > > > >> > appended to the log. >>> > > > > > > > >> >> > > > >> > If a broker is a leader, since it will >>> assign >>> > > the >>> > > > > > > > timestamp >>> > > > > > > > >> by >>> > > > > > > > >> >> > > itself, >>> > > > > > > > >> >> > > > >> the >>> > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local >>> clock >>> > > when >>> > > > > > append >>> > > > > > > > the >>> > > > > > > > >> >> last >>> > > > > > > > >> >> > > > >> message. >>> > > > > > > > >> >> > > > >> > So if there is no leader migration, >>> > > > > > > > >> max(lastAppendedTimestamp, >>> > > > > > > > >> >> > > > >> > currentTimeMillis) = currentTimeMillis. >>> > > > > > > > >> >> > > > >> > If a broker is a follower, because it will >>> > keep >>> > > > the >>> > > > > > > > leader's >>> > > > > > > > >> >> > > timestamp >>> > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would be >>> the >>> > > > > leader's >>> > > > > > > > clock >>> > > > > > > > >> >> when >>> > > > > > > > >> >> > it >>> > > > > > > > >> >> > > > >> appends >>> > > > > > > > >> >> > > > >> > that message message. It keeps track of >>> the >>> > > > > > > > lastAppendedTime >>> > > > > > > > >> >> only >>> > > > > > > > >> >> > in >>> > > > > > > > >> >> > > > >> case >>> > > > > > > > >> >> > > > >> > it becomes leader later on. At that >>> point, it >>> > is >>> > > > > > > possible >>> > > > > > > > >> that >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > > > >> > timestamp of the last appended message was >>> > > stamped >>> > > > > by >>> > > > > > > old >>> > > > > > > > >> >> leader, >>> > > > > > > > >> >> > > but >>> > > > > > > > >> >> > > > >> the >>> > > > > > > > >> >> > > > >> > new leader's currentTimeMillis < >>> > > lastAppendedTime. >>> > > > > If >>> > > > > > a >>> > > > > > > > new >>> > > > > > > > >> >> > message >>> > > > > > > > >> >> > > > >> comes, >>> > > > > > > > >> >> > > > >> > instead of stamp it with new leader's >>> > > > > > currentTimeMillis, >>> > > > > > > > we >>> > > > > > > > >> >> have >>> > > > > > > > >> >> > to >>> > > > > > > > >> >> > > > >> stamp >>> > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the >>> timestamp >>> > in >>> > > > the >>> > > > > > log >>> > > > > > > > >> going >>> > > > > > > > >> >> > > > backward. >>> > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp, >>> > > currentTimeMillis) >>> > > > is >>> > > > > > > > purely >>> > > > > > > > >> >> based >>> > > > > > > > >> >> > on >>> > > > > > > > >> >> > > > the >>> > > > > > > > >> >> > > > >> > broker side clock. If MM produces message >>> with >>> > > > > > different >>> > > > > > > > >> >> > > LogAppendTime >>> > > > > > > > >> >> > > > >> in >>> > > > > > > > >> >> > > > >> > source clusters to the same target >>> cluster, >>> > the >>> > > > > > > > LogAppendTime >>> > > > > > > > >> >> will >>> > > > > > > > >> >> > > be >>> > > > > > > > >> >> > > > >> > ignored re-stamped by target cluster. >>> > > > > > > > >> >> > > > >> > I added a use case example for mirror >>> maker in >>> > > > > KIP-32. >>> > > > > > > > Also >>> > > > > > > > >> >> there >>> > > > > > > > >> >> > > is a >>> > > > > > > > >> >> > > > >> > corner case discussion about when we need >>> the >>> > > > > > > > >> >> > max(lastAppendedTime, >>> > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you take a >>> > look >>> > > > and >>> > > > > > see >>> > > > > > > if >>> > > > > > > > >> that >>> > > > > > > > >> >> > > > answers >>> > > > > > > > >> >> > > > >> > your question? >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > My main motivation is that given that >>> both >>> > > Samza >>> > > > > and >>> > > > > > > > Kafka >>> > > > > > > > >> >> > streams >>> > > > > > > > >> >> > > > are >>> > > > > > > > >> >> > > > >> > > doing work that implies a mandatory >>> > > > client-defined >>> > > > > > > > notion >>> > > > > > > > >> of >>> > > > > > > > >> >> > > time, I >>> > > > > > > > >> >> > > > >> > really >>> > > > > > > > >> >> > > > >> > > think introducing a different mandatory >>> > notion >>> > > > of >>> > > > > > time >>> > > > > > > > in >>> > > > > > > > >> >> Kafka >>> > > > > > > > >> >> > is >>> > > > > > > > >> >> > > > >> going >>> > > > > > > > >> >> > > > >> > to >>> > > > > > > > >> >> > > > >> > > be quite odd. We should think hard >>> about how >>> > > > > > > > client-defined >>> > > > > > > > >> >> time >>> > > > > > > > >> >> > > > could >>> > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but I'm >>> also >>> > not >>> > > > > sure >>> > > > > > > > that it >>> > > > > > > > >> >> > can't. >>> > > > > > > > >> >> > > > >> Having >>> > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about >>> this >>> > with >>> > > > > > > > Yi/Kartik on >>> > > > > > > > >> >> the >>> > > > > > > > >> >> > > > Samza >>> > > > > > > > >> >> > > > >> > side? >>> > > > > > > > >> >> > > > >> > I talked with Kartik and realized that it >>> > would >>> > > be >>> > > > > > > useful >>> > > > > > > > to >>> > > > > > > > >> >> have >>> > > > > > > > >> >> > a >>> > > > > > > > >> >> > > > >> client >>> > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like >>> stream >>> > > > > > > processing. >>> > > > > > > > >> >> > > > >> > I was trying to figure out if we can >>> simply >>> > use >>> > > > > client >>> > > > > > > > >> >> timestamp >>> > > > > > > > >> >> > > > without >>> > > > > > > > >> >> > > > >> > introducing the server time. There are >>> some >>> > > > > discussion >>> > > > > > > in >>> > > > > > > > the >>> > > > > > > > >> >> KIP. >>> > > > > > > > >> >> > > > >> > The key problem we want to solve here is >>> > > > > > > > >> >> > > > >> > 1. We want log retention and rolling to >>> depend >>> > > on >>> > > > > > server >>> > > > > > > > >> clock. >>> > > > > > > > >> >> > > > >> > 2. We want to make sure the log-assiciated >>> > > > timestamp >>> > > > > > to >>> > > > > > > be >>> > > > > > > > >> >> > retained >>> > > > > > > > >> >> > > > when >>> > > > > > > > >> >> > > > >> > replicas moves. >>> > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in some >>> way >>> > that >>> > > > can >>> > > > > > > allow >>> > > > > > > > >> >> > searching >>> > > > > > > > >> >> > > > by >>> > > > > > > > >> >> > > > >> > timestamp. >>> > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass the >>> > > > > > > log-associated >>> > > > > > > > >> >> > timestamp >>> > > > > > > > >> >> > > > >> > through replication, that means we need to >>> > have >>> > > a >>> > > > > > > > different >>> > > > > > > > >> >> > protocol >>> > > > > > > > >> >> > > > for >>> > > > > > > > >> >> > > > >> > replica fetching to pass log-associated >>> > > timestamp. >>> > > > > It >>> > > > > > is >>> > > > > > > > >> >> actually >>> > > > > > > > >> >> > > > >> > complicated and there could be a lot of >>> corner >>> > > > cases >>> > > > > > to >>> > > > > > > > >> handle. >>> > > > > > > > >> >> > e.g. >>> > > > > > > > >> >> > > > >> what >>> > > > > > > > >> >> > > > >> > if an old leader started to fetch from >>> the new >>> > > > > leader, >>> > > > > > > > should >>> > > > > > > > >> >> it >>> > > > > > > > >> >> > > also >>> > > > > > > > >> >> > > > >> > update all of its old log segment >>> timestamp? >>> > > > > > > > >> >> > > > >> > I think actually client side timestamp >>> would >>> > be >>> > > > > better >>> > > > > > > > for 3 >>> > > > > > > > >> >> if we >>> > > > > > > > >> >> > > can >>> > > > > > > > >> >> > > > >> > find a way to make it work. >>> > > > > > > > >> >> > > > >> > So far I am not able to convince myself >>> that >>> > > only >>> > > > > > having >>> > > > > > > > >> client >>> > > > > > > > >> >> > side >>> > > > > > > > >> >> > > > >> > timestamp would work mainly because 1 and >>> 2. >>> > > There >>> > > > > > are a >>> > > > > > > > few >>> > > > > > > > >> >> > > > situations >>> > > > > > > > >> >> > > > >> I >>> > > > > > > > >> >> > > > >> > mentioned in the KIP. >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > When you are saying it won't work you >>> are >>> > > > assuming >>> > > > > > > some >>> > > > > > > > >> >> > particular >>> > > > > > > > >> >> > > > >> > > implementation? Maybe that the index is >>> a >>> > > > > > > monotonically >>> > > > > > > > >> >> > increasing >>> > > > > > > > >> >> > > > >> set of >>> > > > > > > > >> >> > > > >> > > pointers to the least record with a >>> > timestamp >>> > > > > larger >>> > > > > > > > than >>> > > > > > > > >> the >>> > > > > > > > >> >> > > index >>> > > > > > > > >> >> > > > >> time? >>> > > > > > > > >> >> > > > >> > > In other words a search for time X >>> gives the >>> > > > > largest >>> > > > > > > > offset >>> > > > > > > > >> >> at >>> > > > > > > > >> >> > > which >>> > > > > > > > >> >> > > > >> all >>> > > > > > > > >> >> > > > >> > > records are <= X? >>> > > > > > > > >> >> > > > >> > It is a promising idea. We probably can >>> have >>> > an >>> > > > > > > in-memory >>> > > > > > > > >> index >>> > > > > > > > >> >> > like >>> > > > > > > > >> >> > > > >> that, >>> > > > > > > > >> >> > > > >> > but might be complicated to have a file on >>> > disk >>> > > > like >>> > > > > > > that. >>> > > > > > > > >> >> Imagine >>> > > > > > > > >> >> > > > there >>> > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see >>> message Y >>> > > > created >>> > > > > > at >>> > > > > > > T1 >>> > > > > > > > >> and >>> > > > > > > > >> >> > > created >>> > > > > > > > >> >> > > > >> > index like [T1->Y], then we see message >>> > created >>> > > at >>> > > > > T1, >>> > > > > > > > >> >> supposedly >>> > > > > > > > >> >> > we >>> > > > > > > > >> >> > > > >> should >>> > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], it is >>> > easy >>> > > to >>> > > > > do >>> > > > > > in >>> > > > > > > > >> >> memory, >>> > > > > > > > >> >> > but >>> > > > > > > > >> >> > > > we >>> > > > > > > > >> >> > > > >> > might have to rewrite the index file >>> > completely. >>> > > > > Maybe >>> > > > > > > we >>> > > > > > > > can >>> > > > > > > > >> >> have >>> > > > > > > > >> >> > > the >>> > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and only >>> > update >>> > > > the >>> > > > > > > first >>> > > > > > > > >> >> pointer >>> > > > > > > > >> >> > > for >>> > > > > > > > >> >> > > > >> any >>> > > > > > > > >> >> > > > >> > out of range timestamp, so the index will >>> be >>> > > > [0->X, >>> > > > > > > > T1->Y]. >>> > > > > > > > >> >> Also, >>> > > > > > > > >> >> > > the >>> > > > > > > > >> >> > > > >> range >>> > > > > > > > >> >> > > > >> > of timestamps in the log segments can >>> overlap >>> > > with >>> > > > > > each >>> > > > > > > > >> other. >>> > > > > > > > >> >> > That >>> > > > > > > > >> >> > > > >> means >>> > > > > > > > >> >> > > > >> > we either need to keep a cross segments >>> index >>> > > file >>> > > > > or >>> > > > > > we >>> > > > > > > > need >>> > > > > > > > >> >> to >>> > > > > > > > >> >> > > check >>> > > > > > > > >> >> > > > >> all >>> > > > > > > > >> >> > > > >> > the index file for each log segment. >>> > > > > > > > >> >> > > > >> > I separated out the time based log index >>> to >>> > > KIP-33 >>> > > > > > > > because it >>> > > > > > > > >> >> can >>> > > > > > > > >> >> > be >>> > > > > > > > >> >> > > > an >>> > > > > > > > >> >> > > > >> > independent follow up feature as Neha >>> > > suggested. I >>> > > > > > will >>> > > > > > > > try >>> > > > > > > > >> to >>> > > > > > > > >> >> > make >>> > > > > > > > >> >> > > > the >>> > > > > > > > >> >> > > > >> > time based index work with client side >>> > > timestamp. >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > For retention, I agree with the problem >>> you >>> > > > point >>> > > > > > out, >>> > > > > > > > but >>> > > > > > > > >> I >>> > > > > > > > >> >> > think >>> > > > > > > > >> >> > > > >> what >>> > > > > > > > >> >> > > > >> > you >>> > > > > > > > >> >> > > > >> > > are saying in that case is that you >>> want a >>> > > size >>> > > > > > limit >>> > > > > > > > too. >>> > > > > > > > >> If >>> > > > > > > > >> >> > you >>> > > > > > > > >> >> > > > use >>> > > > > > > > >> >> > > > >> > > system time you actually hit the same >>> > problem: >>> > > > say >>> > > > > > you >>> > > > > > > > do a >>> > > > > > > > >> >> full >>> > > > > > > > >> >> > > > dump >>> > > > > > > > >> >> > > > >> of >>> > > > > > > > >> >> > > > >> > a >>> > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days >>> retention, >>> > > > your >>> > > > > > > > retention >>> > > > > > > > >> >> will >>> > > > > > > > >> >> > > > >> actually >>> > > > > > > > >> >> > > > >> > > not get enforced for the first 7 days >>> > because >>> > > > the >>> > > > > > data >>> > > > > > > > is >>> > > > > > > > >> >> "new >>> > > > > > > > >> >> > to >>> > > > > > > > >> >> > > > >> Kafka". >>> > > > > > > > >> >> > > > >> > I kind of think the size limit here is >>> > > orthogonal. >>> > > > > It >>> > > > > > > is a >>> > > > > > > > >> >> valid >>> > > > > > > > >> >> > use >>> > > > > > > > >> >> > > > >> case >>> > > > > > > > >> >> > > > >> > where people only want to use time based >>> > > retention >>> > > > > > only. >>> > > > > > > > In >>> > > > > > > > >> >> your >>> > > > > > > > >> >> > > > >> example, >>> > > > > > > > >> >> > > > >> > depending on client timestamp might break >>> the >>> > > > > > > > functionality - >>> > > > > > > > >> >> say >>> > > > > > > > >> >> > it >>> > > > > > > > >> >> > > > is >>> > > > > > > > >> >> > > > >> a >>> > > > > > > > >> >> > > > >> > bootstrap case people actually need to >>> read >>> > all >>> > > > the >>> > > > > > > data. >>> > > > > > > > If >>> > > > > > > > >> we >>> > > > > > > > >> >> > > depend >>> > > > > > > > >> >> > > > >> on >>> > > > > > > > >> >> > > > >> > the client timestamp, the data might be >>> > deleted >>> > > > > > > instantly >>> > > > > > > > >> when >>> > > > > > > > >> >> > they >>> > > > > > > > >> >> > > > >> come to >>> > > > > > > > >> >> > > > >> > the broker. It might be too demanding to >>> > expect >>> > > > the >>> > > > > > > > broker to >>> > > > > > > > >> >> > > > understand >>> > > > > > > > >> >> > > > >> > what people actually want to do with the >>> data >>> > > > coming >>> > > > > > in. >>> > > > > > > > So >>> > > > > > > > >> the >>> > > > > > > > >> >> > > > >> guarantee >>> > > > > > > > >> >> > > > >> > of using server side timestamp is that >>> "after >>> > > > > appended >>> > > > > > > to >>> > > > > > > > the >>> > > > > > > > >> >> log, >>> > > > > > > > >> >> > > all >>> > > > > > > > >> >> > > > >> > messages will be available on broker for >>> > > retention >>> > > > > > > time", >>> > > > > > > > >> >> which is >>> > > > > > > > >> >> > > not >>> > > > > > > > >> >> > > > >> > changeable by clients. >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > -Jay >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie >>> > Qin < >>> > > > > > > > >> >> j...@linkedin.com >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > > >> wrote: >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> >> Hi folks, >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> This proposal was previously in KIP-31 >>> and we >>> > > > > > separated >>> > > > > > > > it >>> > > > > > > > >> to >>> > > > > > > > >> >> > > KIP-32 >>> > > > > > > > >> >> > > > >> per >>> > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion. >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> The proposal is to add the following two >>> > > > timestamps >>> > > > > > to >>> > > > > > > > Kafka >>> > > > > > > > >> >> > > message. >>> > > > > > > > >> >> > > > >> >> - CreateTime >>> > > > > > > > >> >> > > > >> >> - LogAppendTime >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the >>> producer >>> > and >>> > > > will >>> > > > > > > > change >>> > > > > > > > >> >> after >>> > > > > > > > >> >> > > > that. >>> > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by broker >>> for >>> > > > purpose >>> > > > > > > such >>> > > > > > > > as >>> > > > > > > > >> >> > enforce >>> > > > > > > > >> >> > > > log >>> > > > > > > > >> >> > > > >> >> retention and log rolling. >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> Thanks, >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> >> >>> > > > > > > > >> >> > > > >> > >>> > > > > > > > >> >> > > > >> >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > -- >>> > > > > > > > >> >> > > -- Guozhang >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > >>> > > > > > > > >> >> >>> > > > > > > > >> > >>> > > > > > > > >> > >>> > > > > > > > >> >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >