Hi Guozhang, That makes sense. I will update the KIP wiki and bump up the voting thread to let people know about this change.
Thanks, Jiangjie (Becket) Qin On Tue, Jan 26, 2016 at 10:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: > One motivation of my proposal is actually to avoid any clients trying to > read the timestamp type from the topic metadata response and behave > differently since: > > 1) topic metadata response is not always in-sync with the source-of-truth > (ZK), hence when the clients realized that the config has changed it may > already be too late (i.e. for consumer the records with the wrong timestamp > could already be returned to user). > > 2) the client logic could be a bit simpler, and this will benefit non-Java > development a lot. Also we can avoid adding this into the topic metadata > response. > > Guozhang > > On Tue, Jan 26, 2016 at 3:20 PM, Becket Qin <becket....@gmail.com> wrote: > > > My hesitation for the changed protocol is that I think If we will have > > topic configuration returned in the topic metadata, the current protocol > > makes more sense. Because the timestamp type is a topic level setting so > we > > don't need to put it into each message. That is assuming the timestamp > type > > change on a topic rarely happens and if it is ever needed, the existing > > data should be wiped out. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Bump up this thread per discussion on the KIP hangout. > > > > > > During the implementation of the KIP, Guozhang raised another proposal > on > > > how to indicate the message timestamp type used by messages. So we want > > to > > > see people's opinion on this proposal. > > > > > > The difference between current and the new proposal only differs on > > > messages that are a) compressed, and b) using LogAppendTime > > > > > > For compressed messages using LogAppendTime, the timestamps in the > > current > > > proposal is as below: > > > 1. When a producer produces the messages, it tries to set timestamp to > -1 > > > for inner messages if it knows LogAppendTime is used. > > > 2. When a broker receives the messages, it will overwrite the timestamp > > of > > > inner message to -1 if needed and write server time to the wrapper > > message. > > > Broker will do re-compression if inner message timestamp is > overwritten. > > > 3. When a consumer receives the messages, it will see the inner message > > > timestamp is -1 so the wrapper message timestamp is used. > > > > > > Implementation wise, this proposal requires the producer to set > timestamp > > > for inner messages correctly to avoid broker side re-compression. To do > > > that, the short term solution is to let producer infer the timestamp > type > > > returned by broker in ProduceResponse and set correct timestamp > > afterwards. > > > This means the first few batches will still need re-compression on the > > > broker. The long term solution is to have producer get topic > > configuration > > > during metadata update. > > > > > > > > > The proposed modification is: > > > 1. When a producer produces the messages, it always using create time. > > > 2. When a broker receives the messages, it ignores the inner messages > > > timestamp, but simply set a wrapper message timestamp type attribute > bit > > to > > > 1 and set the timestamp of the wrapper message to server time. (The > > broker > > > will also set the timesatmp type attribute bit accordingly for > > > non-compressed messages using LogAppendTime). > > > 3. When a consumer receives the messages, it checks timestamp type > > > attribute bit of wrapper message. If it is set to 1, the inner > message's > > > timestamp will be ignored and the wrapper message's timestamp will be > > used. > > > > > > This approach uses an extra attribute bit. The good thing of the > modified > > > protocol is consumers will be able to know the timestamp type. And > > > re-compression on broker side is completely avoided no matter what > value > > is > > > sent by the producer. In this approach the inner messages will have > wrong > > > timestamps. > > > > > > We want to see if people have concerns over the modified approach. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > >> Jun, > > >> > > >> 1. I agree it would be nice to have the timestamps used in a unified > > way. > > >> My concern is that if we let server change timestamp of the inner > > message > > >> for LogAppendTime, that will enforce the user who are using > > LogAppendTime > > >> to always pay the recompression penalty. So using LogAppendTime makes > > >> KIP-31 in vain. > > >> > > >> 4. If there are no entries in the log segment, we can read from the > time > > >> index before the previous log segment. If there is no previous entry > > >> avaliable after we search until the earliest log segment, that means > all > > >> the previous log segment with a valid time index entry has been > > deleted. In > > >> that case supposedly there should be only one log segment left - the > > active > > >> log segment, we can simply set the latest timestamp to 0. > > >> > > >> Guozhang, > > >> > > >> Sorry for the confusion. by "the timestamp of the latest message" I > > >> actually meant "the timestamp of the message with largest timestamp". > > So in > > >> your example the "latest message" is 5. > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> > > >> > > >> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > >>> Jun, Jiangjie, > > >>> > > >>> I am confused about 3) here, if we use "the timestamp of the latest > > >>> message" > > >>> then doesn't this mean we will roll the log whenever a message > delayed > > by > > >>> rolling time is received as well? Just to clarify, my understanding > of > > >>> "the > > >>> timestamp of the latest message", for example in the following log, > is > > 1, > > >>> not 5: > > >>> > > >>> 2, 3, 4, 5, 1 > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote: > > >>> > > >>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp > > >>> whether > > >>> > the messages are compressed or not. When > > >>> > message.timestamp.type=LogAppendTime, > > >>> > we will need to set timestamp in each message if messages are not > > >>> > compressed, so that the follower can get the same timestamp. So, it > > >>> seems > > >>> > that we should do the same thing for inner messages when messages > are > > >>> > compressed. > > >>> > > > >>> > 4. I thought on startup, we restore the timestamp of the latest > > >>> message by > > >>> > reading from the time index of the last log segment. So, what > happens > > >>> if > > >>> > there are no index entries? > > >>> > > > >>> > Thanks, > > >>> > > > >>> > Jun > > >>> > > > >>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com> > > >>> wrote: > > >>> > > > >>> > > Thanks for the explanation, Jun. > > >>> > > > > >>> > > 1. That makes sense. So maybe we can do the following: > > >>> > > (a) Set the timestamp in the compressed message to latest > timestamp > > >>> of > > >>> > all > > >>> > > its inner messages. This works for both LogAppendTime and > > CreateTime. > > >>> > > (b) If message.timestamp.type=LogAppendTime, the broker will > > >>> overwrite > > >>> > all > > >>> > > the inner message timestamp to -1 if they are not set to -1. This > > is > > >>> > mainly > > >>> > > for topics that are using LogAppendTime. Hopefully the producer > > will > > >>> set > > >>> > > the timestamp to -1 in the ProducerRecord to avoid server side > > >>> > > recompression. > > >>> > > > > >>> > > 3. I see. That works. So the semantic of log rolling becomes > "roll > > >>> out > > >>> > the > > >>> > > log segment if it has been inactive since the latest message has > > >>> > arrived." > > >>> > > > > >>> > > 4. Yes. If the largest timestamp is in previous log segment. The > > time > > >>> > index > > >>> > > for the current log segment does not have a valid offset in > current > > >>> log > > >>> > > segment to point to. Maybe in that case we should build an empty > > log > > >>> > index. > > >>> > > > > >>> > > Thanks, > > >>> > > > > >>> > > Jiangjie (Becket) Qin > > >>> > > > > >>> > > > > >>> > > > > >>> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> > wrote: > > >>> > > > > >>> > > > 1. I was thinking more about saving the decompression overhead > in > > >>> the > > >>> > > > follower. Currently, the follower doesn't decompress the > > messages. > > >>> To > > >>> > > keep > > >>> > > > it that way, the outer message needs to include the timestamp > of > > >>> the > > >>> > > latest > > >>> > > > inner message to build the time index in the follower. The > > simplest > > >>> > thing > > >>> > > > to do is to change the timestamp in the inner messages if > > >>> necessary, in > > >>> > > > which case there will be the recompression overhead. However, > in > > >>> the > > >>> > case > > >>> > > > when the timestamp of the inner messages don't have to be > changed > > >>> > > > (hopefully more common), there won't be the recompression > > >>> overhead. In > > >>> > > > either case, we always set the timestamp in the outer message > to > > >>> be the > > >>> > > > timestamp of the latest inner message, in the leader. > > >>> > > > > > >>> > > > 3. Basically, in each log segment, we keep track of the > timestamp > > >>> of > > >>> > the > > >>> > > > latest message. If current time - timestamp of latest message > > > log > > >>> > > rolling > > >>> > > > interval, we roll a new log segment. So, if messages with later > > >>> > > timestamps > > >>> > > > keep getting added, we only roll new log segments based on > size. > > >>> On the > > >>> > > > other hand, if no new messages are added to a log, we can > force a > > >>> log > > >>> > > roll > > >>> > > > based on time, which addresses the issue in (b). > > >>> > > > > > >>> > > > 4. Hmm, the index is per segment and should only point to > > >>> positions in > > >>> > > the > > >>> > > > corresponding .log file, not previous ones, right? > > >>> > > > > > >>> > > > Thanks, > > >>> > > > > > >>> > > > Jun > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin < > > becket....@gmail.com> > > >>> > > wrote: > > >>> > > > > > >>> > > > > Hi Jun, > > >>> > > > > > > >>> > > > > Thanks a lot for the comments. Please see inline replies. > > >>> > > > > > > >>> > > > > Thanks, > > >>> > > > > > > >>> > > > > Jiangjie (Becket) Qin > > >>> > > > > > > >>> > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io> > > >>> wrote: > > >>> > > > > > > >>> > > > > > Hi, Becket, > > >>> > > > > > > > >>> > > > > > Thanks for the proposal. Looks good overall. A few comments > > >>> below. > > >>> > > > > > > > >>> > > > > > 1. KIP-32 didn't say what timestamp should be set in a > > >>> compressed > > >>> > > > > message. > > >>> > > > > > We probably should set it to the timestamp of the latest > > >>> messages > > >>> > > > > included > > >>> > > > > > in the compressed one. This way, during indexing, we don't > > >>> have to > > >>> > > > > > decompress the message. > > >>> > > > > > > > >>> > > > > That is a good point. > > >>> > > > > In normal cases, broker needs to decompress the message for > > >>> > > verification > > >>> > > > > purpose anyway. So building time index does not add > additional > > >>> > > > > decompression. > > >>> > > > > During time index recovery, however, having a timestamp in > > >>> compressed > > >>> > > > > message might save the decompression. > > >>> > > > > > > >>> > > > > Another thing I am thinking is we should make sure KIP-32 > works > > >>> well > > >>> > > with > > >>> > > > > KIP-31. i.e. we don't want to do recompression in order to > add > > >>> > > timestamp > > >>> > > > to > > >>> > > > > messages. > > >>> > > > > Take the approach in my last email, the timestamp in the > > messages > > >>> > will > > >>> > > > > either all be overwritten by server if > > >>> > > > > message.timestamp.type=LogAppendTime, or they will not be > > >>> overwritten > > >>> > > if > > >>> > > > > message.timestamp.type=CreateTime. > > >>> > > > > > > >>> > > > > Maybe we can use the timestamp in compressed messages in the > > >>> > following > > >>> > > > way: > > >>> > > > > If message.timestamp.type=LogAppendTime, we have to overwrite > > >>> > > timestamps > > >>> > > > > for all the messages. We can simply write the timestamp in > the > > >>> > > compressed > > >>> > > > > message to avoid recompression. > > >>> > > > > If message.timestamp.type=CreateTime, we do not need to > > >>> overwrite the > > >>> > > > > timestamps. We either reject the entire compressed message or > > We > > >>> just > > >>> > > > leave > > >>> > > > > the compressed message timestamp to be -1. > > >>> > > > > > > >>> > > > > So the semantic of the timestamp field in compressed message > > >>> field > > >>> > > > becomes: > > >>> > > > > if it is greater than 0, that means LogAppendTime is used, > the > > >>> > > timestamp > > >>> > > > of > > >>> > > > > the inner messages is the compressed message LogAppendTime. > If > > >>> it is > > >>> > > -1, > > >>> > > > > that means the CreateTime is used, the timestamp is in each > > >>> > individual > > >>> > > > > inner message. > > >>> > > > > > > >>> > > > > This sacrifice the speed of recovery but seems worthy because > > we > > >>> > avoid > > >>> > > > > recompression. > > >>> > > > > > > >>> > > > > > > >>> > > > > > 2. In KIP-33, should we make the time-based index interval > > >>> > > > configurable? > > >>> > > > > > Perhaps we can default it 60 secs, but allow users to > > >>> configure it > > >>> > to > > >>> > > > > > smaller values if they want more precision. > > >>> > > > > > > > >>> > > > > Yes, we can do that. > > >>> > > > > > > >>> > > > > > > >>> > > > > > 3. In KIP-33, I am not sure if log rolling should be based > on > > >>> the > > >>> > > > > earliest > > >>> > > > > > message. This would mean that we will need to roll a log > > >>> segment > > >>> > > every > > >>> > > > > time > > >>> > > > > > we get a message delayed by the log rolling time interval. > > >>> Also, on > > >>> > > > > broker > > >>> > > > > > startup, we can get the timestamp of the latest message in > a > > >>> log > > >>> > > > segment > > >>> > > > > > pretty efficiently by just looking at the last time index > > >>> entry. > > >>> > But > > >>> > > > > > getting the timestamp of the earliest timestamp requires a > > full > > >>> > scan > > >>> > > of > > >>> > > > > all > > >>> > > > > > log segments, which can be expensive. Previously, there > were > > >>> two > > >>> > use > > >>> > > > > cases > > >>> > > > > > of time-based rolling: (a) more accurate time-based > indexing > > >>> and > > >>> > (b) > > >>> > > > > > retaining data by time (since the active segment is never > > >>> deleted). > > >>> > > (a) > > >>> > > > > is > > >>> > > > > > already solved with a time-based index. For (b), if the > > >>> retention > > >>> > is > > >>> > > > > based > > >>> > > > > > on the timestamp of the latest message in a log segment, > > >>> perhaps > > >>> > log > > >>> > > > > > rolling should be based on that too. > > >>> > > > > > > > >>> > > > > I am not sure how to make log rolling work with the latest > > >>> timestamp > > >>> > in > > >>> > > > > current log segment. Do you mean the log rolling can based on > > the > > >>> > last > > >>> > > > log > > >>> > > > > segment's latest timestamp? If so how do we roll out the > first > > >>> > segment? > > >>> > > > > > > >>> > > > > > > >>> > > > > > 4. In KIP-33, I presume the timestamp in the time index > will > > be > > >>> > > > > > monotonically increasing. So, if all messages in a log > > segment > > >>> > have a > > >>> > > > > > timestamp less than the largest timestamp in the previous > log > > >>> > > segment, > > >>> > > > we > > >>> > > > > > will use the latter to index this log segment? > > >>> > > > > > > > >>> > > > > Yes. The timestamps are monotonically increasing. If the > > largest > > >>> > > > timestamp > > >>> > > > > in the previous segment is very big, it is possible the time > > >>> index of > > >>> > > the > > >>> > > > > current segment only have two index entries (inserted during > > >>> segment > > >>> > > > > creation and roll out), both are pointing to a message in the > > >>> > previous > > >>> > > > log > > >>> > > > > segment. This is the corner case I mentioned before that we > > >>> should > > >>> > > expire > > >>> > > > > the next log segment even before expiring the previous log > > >>> segment > > >>> > just > > >>> > > > > because the largest timestamp is in previous log segment. In > > >>> current > > >>> > > > > approach, we will wait until the previous log segment > expires, > > >>> and > > >>> > then > > >>> > > > > delete both the previous log segment and the next log > segment. > > >>> > > > > > > >>> > > > > > > >>> > > > > > 5. In KIP-32, in the wire protocol, we mention both > timestamp > > >>> and > > >>> > > time. > > >>> > > > > > They should be consistent. > > >>> > > > > > > > >>> > > > > Will fix the wiki page. > > >>> > > > > > > >>> > > > > > > >>> > > > > > Jun > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin < > > >>> becket....@gmail.com > > >>> > > > > >>> > > > > wrote: > > >>> > > > > > > > >>> > > > > > > Hey Jay, > > >>> > > > > > > > > >>> > > > > > > Thanks for the comments. > > >>> > > > > > > > > >>> > > > > > > Good point about the actions after when > > >>> > max.message.time.difference > > >>> > > > is > > >>> > > > > > > exceeded. Rejection is a useful behavior although I > cannot > > >>> think > > >>> > of > > >>> > > > use > > >>> > > > > > > case at LinkedIn at this moment. I think it makes sense > to > > >>> add a > > >>> > > > > > > configuration. > > >>> > > > > > > > > >>> > > > > > > How about the following configurations? > > >>> > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime > > >>> > > > > > > 2. max.message.time.difference.ms > > >>> > > > > > > > > >>> > > > > > > if message.timestamp.type is set to CreateTime, when the > > >>> broker > > >>> > > > > receives > > >>> > > > > > a > > >>> > > > > > > message, it will further check > > >>> max.message.time.difference.ms, > > >>> > and > > >>> > > > > will > > >>> > > > > > > reject the message it the time difference exceeds the > > >>> threshold. > > >>> > > > > > > If message.timestamp.type is set to LogAppendTime, the > > broker > > >>> > will > > >>> > > > > always > > >>> > > > > > > stamp the message with current server time, regardless > the > > >>> value > > >>> > of > > >>> > > > > > > max.message.time.difference.ms > > >>> > > > > > > > > >>> > > > > > > This will make sure the message on the broker is either > > >>> > CreateTime > > >>> > > or > > >>> > > > > > > LogAppendTime, but not mixture of both. > > >>> > > > > > > > > >>> > > > > > > What do you think? > > >>> > > > > > > > > >>> > > > > > > Thanks, > > >>> > > > > > > > > >>> > > > > > > Jiangjie (Becket) Qin > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps < > > j...@confluent.io> > > >>> > > wrote: > > >>> > > > > > > > > >>> > > > > > > > Hey Becket, > > >>> > > > > > > > > > >>> > > > > > > > That summary of pros and cons sounds about right to me. > > >>> > > > > > > > > > >>> > > > > > > > There are potentially two actions you could take when > > >>> > > > > > > > max.message.time.difference is exceeded--override it or > > >>> reject > > >>> > > the > > >>> > > > > > > > message entirely. Can we pick one of these or does the > > >>> action > > >>> > > need > > >>> > > > to > > >>> > > > > > > > be configurable too? (I'm not sure). The downside of > more > > >>> > > > > > > > configuration is that it is more fiddly and has more > > modes. > > >>> > > > > > > > > > >>> > > > > > > > I suppose the reason I was thinking of this as a > > >>> "difference" > > >>> > > > rather > > >>> > > > > > > > than a hard type was that if you were going to go the > > >>> reject > > >>> > mode > > >>> > > > you > > >>> > > > > > > > would need some tolerance setting (i.e. if your SLA is > > >>> that if > > >>> > > your > > >>> > > > > > > > timestamp is off by more than 10 minutes I give you an > > >>> error). > > >>> > I > > >>> > > > > agree > > >>> > > > > > > > with you that having one field that is potentially > > >>> containing a > > >>> > > mix > > >>> > > > > of > > >>> > > > > > > > two values is a bit weird. > > >>> > > > > > > > > > >>> > > > > > > > -Jay > > >>> > > > > > > > > > >>> > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin < > > >>> > becket....@gmail.com > > >>> > > > > > >>> > > > > > wrote: > > >>> > > > > > > > > It looks the format of the previous email was messed > > up. > > >>> Send > > >>> > > it > > >>> > > > > > again. > > >>> > > > > > > > > > > >>> > > > > > > > > Just to recap, the last proposal Jay made (with some > > >>> > > > implementation > > >>> > > > > > > > > details added) > > >>> > > > > > > > > was: > > >>> > > > > > > > > > > >>> > > > > > > > > 1. Allow user to stamp the message when produce > > >>> > > > > > > > > > > >>> > > > > > > > > 2. When broker receives a message it take a look at > the > > >>> > > > difference > > >>> > > > > > > > between > > >>> > > > > > > > > its local time and the timestamp in the message. > > >>> > > > > > > > > a. If the time difference is within a configurable > > >>> > > > > > > > > max.message.time.difference.ms, the server will > accept > > >>> it > > >>> > and > > >>> > > > > append > > >>> > > > > > > it > > >>> > > > > > > > to > > >>> > > > > > > > > the log. > > >>> > > > > > > > > b. If the time difference is beyond the configured > > >>> > > > > > > > > max.message.time.difference.ms, the server will > > >>> override the > > >>> > > > > > timestamp > > >>> > > > > > > > with > > >>> > > > > > > > > its current local time and append the message to the > > log. > > >>> > > > > > > > > c. The default value of max.message.time.difference > > >>> would > > >>> > be > > >>> > > > set > > >>> > > > > to > > >>> > > > > > > > > Long.MaxValue. > > >>> > > > > > > > > > > >>> > > > > > > > > 3. The configurable time difference threshold > > >>> > > > > > > > > max.message.time.difference.ms will > > >>> > > > > > > > > be a per topic configuration. > > >>> > > > > > > > > > > >>> > > > > > > > > 4. The indexed will be built so it has the following > > >>> > guarantee. > > >>> > > > > > > > > a. If user search by time stamp: > > >>> > > > > > > > > - all the messages after that timestamp will be > > >>> > consumed. > > >>> > > > > > > > > - user might see earlier messages. > > >>> > > > > > > > > b. The log retention will take a look at the last > > time > > >>> > index > > >>> > > > > entry > > >>> > > > > > in > > >>> > > > > > > > the > > >>> > > > > > > > > time index file. Because the last entry will be the > > >>> latest > > >>> > > > > timestamp > > >>> > > > > > in > > >>> > > > > > > > the > > >>> > > > > > > > > entire log segment. If that entry expires, the log > > >>> segment > > >>> > will > > >>> > > > be > > >>> > > > > > > > deleted. > > >>> > > > > > > > > c. The log rolling has to depend on the earliest > > >>> timestamp. > > >>> > > In > > >>> > > > > this > > >>> > > > > > > > case > > >>> > > > > > > > > we may need to keep a in memory timestamp only for > the > > >>> > current > > >>> > > > > active > > >>> > > > > > > > log. > > >>> > > > > > > > > On recover, we will need to read the active log > segment > > >>> to > > >>> > get > > >>> > > > this > > >>> > > > > > > > timestamp > > >>> > > > > > > > > of the earliest messages. > > >>> > > > > > > > > > > >>> > > > > > > > > 5. The downside of this proposal are: > > >>> > > > > > > > > a. The timestamp might not be monotonically > > increasing. > > >>> > > > > > > > > b. The log retention might become > non-deterministic. > > >>> i.e. > > >>> > > When > > >>> > > > a > > >>> > > > > > > > message > > >>> > > > > > > > > will be deleted now depends on the timestamp of the > > other > > >>> > > > messages > > >>> > > > > in > > >>> > > > > > > the > > >>> > > > > > > > > same log segment. And those timestamps are provided > by > > >>> > > > > > > > > user within a range depending on what the time > > difference > > >>> > > > threshold > > >>> > > > > > > > > configuration is. > > >>> > > > > > > > > c. The semantic meaning of the timestamp in the > > >>> messages > > >>> > > could > > >>> > > > > be a > > >>> > > > > > > > little > > >>> > > > > > > > > bit vague because some of them come from the producer > > and > > >>> > some > > >>> > > of > > >>> > > > > > them > > >>> > > > > > > > are > > >>> > > > > > > > > overwritten by brokers. > > >>> > > > > > > > > > > >>> > > > > > > > > 6. Although the proposal has some downsides, it gives > > >>> user > > >>> > the > > >>> > > > > > > > flexibility > > >>> > > > > > > > > to use the timestamp. > > >>> > > > > > > > > a. If the threshold is set to Long.MaxValue. The > > >>> timestamp > > >>> > in > > >>> > > > the > > >>> > > > > > > > message is > > >>> > > > > > > > > equivalent to CreateTime. > > >>> > > > > > > > > b. If the threshold is set to 0. The timestamp in > the > > >>> > message > > >>> > > > is > > >>> > > > > > > > equivalent > > >>> > > > > > > > > to LogAppendTime. > > >>> > > > > > > > > > > >>> > > > > > > > > This proposal actually allows user to use either > > >>> CreateTime > > >>> > or > > >>> > > > > > > > LogAppendTime > > >>> > > > > > > > > without introducing two timestamp concept at the same > > >>> time. I > > >>> > > > have > > >>> > > > > > > > updated > > >>> > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal. > > >>> > > > > > > > > > > >>> > > > > > > > > One thing I am thinking is that instead of having a > > time > > >>> > > > difference > > >>> > > > > > > > threshold, > > >>> > > > > > > > > should we simply set have a TimestampType > > configuration? > > >>> > > Because > > >>> > > > in > > >>> > > > > > > most > > >>> > > > > > > > > cases, people will either set the threshold to 0 or > > >>> > > > Long.MaxValue. > > >>> > > > > > > > Setting > > >>> > > > > > > > > anything in between will make the timestamp in the > > >>> message > > >>> > > > > > meaningless > > >>> > > > > > > to > > >>> > > > > > > > > user - user don't know if the timestamp has been > > >>> overwritten > > >>> > by > > >>> > > > the > > >>> > > > > > > > brokers. > > >>> > > > > > > > > > > >>> > > > > > > > > Any thoughts? > > >>> > > > > > > > > > > >>> > > > > > > > > Thanks, > > >>> > > > > > > > > Jiangjie (Becket) Qin > > >>> > > > > > > > > > > >>> > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin > > >>> > > > > > > <j...@linkedin.com.invalid > > >>> > > > > > > > > > > >>> > > > > > > > > wrote: > > >>> > > > > > > > > > > >>> > > > > > > > >> Bump up this thread. > > >>> > > > > > > > >> > > >>> > > > > > > > >> Just to recap, the last proposal Jay made (with some > > >>> > > > > implementation > > >>> > > > > > > > details > > >>> > > > > > > > >> added) was: > > >>> > > > > > > > >> > > >>> > > > > > > > >> 1. Allow user to stamp the message when produce > > >>> > > > > > > > >> 2. When broker receives a message it take a look > at > > >>> the > > >>> > > > > > difference > > >>> > > > > > > > >> between its local time and the timestamp in the > > >>> message. > > >>> > > > > > > > >> - If the time difference is within a > > configurable > > >>> > > > > > > > >> max.message.time.difference.ms, the server > will > > >>> > accept > > >>> > > it > > >>> > > > > and > > >>> > > > > > > > append > > >>> > > > > > > > >> it to the log. > > >>> > > > > > > > >> - If the time difference is beyond the > > configured > > >>> > > > > > > > >> max.message.time.difference.ms, the server > will > > >>> > > override > > >>> > > > > the > > >>> > > > > > > > >> timestamp with its current local time and > append > > >>> the > > >>> > > > message > > >>> > > > > > to > > >>> > > > > > > > the > > >>> > > > > > > > >> log. > > >>> > > > > > > > >> - The default value of > > max.message.time.difference > > >>> > would > > >>> > > > be > > >>> > > > > > set > > >>> > > > > > > to > > >>> > > > > > > > >> Long.MaxValue. > > >>> > > > > > > > >> 3. The configurable time difference threshold > > >>> > > > > > > > >> max.message.time.difference.ms will be a per > topic > > >>> > > > > > configuration. > > >>> > > > > > > > >> 4. The indexed will be built so it has the > > following > > >>> > > > guarantee. > > >>> > > > > > > > >> - If user search by time stamp: > > >>> > > > > > > > >> - all the messages after that timestamp will be > > >>> consumed. > > >>> > > > > > > > >> - user might see earlier messages. > > >>> > > > > > > > >> - The log retention will take a look at the > last > > >>> time > > >>> > > > index > > >>> > > > > > > entry > > >>> > > > > > > > in > > >>> > > > > > > > >> the time index file. Because the last entry > will > > >>> be > > >>> > the > > >>> > > > > latest > > >>> > > > > > > > >> timestamp in > > >>> > > > > > > > >> the entire log segment. If that entry expires, > > >>> the log > > >>> > > > > segment > > >>> > > > > > > > will > > >>> > > > > > > > >> be > > >>> > > > > > > > >> deleted. > > >>> > > > > > > > >> - The log rolling has to depend on the > earliest > > >>> > > timestamp. > > >>> > > > > In > > >>> > > > > > > this > > >>> > > > > > > > >> case we may need to keep a in memory timestamp > > >>> only > > >>> > for > > >>> > > > the > > >>> > > > > > > > >> current active > > >>> > > > > > > > >> log. On recover, we will need to read the > active > > >>> log > > >>> > > > segment > > >>> > > > > > to > > >>> > > > > > > > get > > >>> > > > > > > > >> this > > >>> > > > > > > > >> timestamp of the earliest messages. > > >>> > > > > > > > >> 5. The downside of this proposal are: > > >>> > > > > > > > >> - The timestamp might not be monotonically > > >>> increasing. > > >>> > > > > > > > >> - The log retention might become > > >>> non-deterministic. > > >>> > i.e. > > >>> > > > > When > > >>> > > > > > a > > >>> > > > > > > > >> message will be deleted now depends on the > > >>> timestamp > > >>> > of > > >>> > > > the > > >>> > > > > > > > >> other messages > > >>> > > > > > > > >> in the same log segment. And those timestamps > > are > > >>> > > provided > > >>> > > > > by > > >>> > > > > > > > >> user within a > > >>> > > > > > > > >> range depending on what the time difference > > >>> threshold > > >>> > > > > > > > configuration > > >>> > > > > > > > >> is. > > >>> > > > > > > > >> - The semantic meaning of the timestamp in the > > >>> > messages > > >>> > > > > could > > >>> > > > > > > be a > > >>> > > > > > > > >> little bit vague because some of them come > from > > >>> the > > >>> > > > producer > > >>> > > > > > and > > >>> > > > > > > > >> some of > > >>> > > > > > > > >> them are overwritten by brokers. > > >>> > > > > > > > >> 6. Although the proposal has some downsides, > it > > >>> gives > > >>> > > user > > >>> > > > > the > > >>> > > > > > > > >> flexibility to use the timestamp. > > >>> > > > > > > > >> - If the threshold is set to Long.MaxValue. The > > >>> timestamp > > >>> > > in > > >>> > > > > the > > >>> > > > > > > > message > > >>> > > > > > > > >> is equivalent to CreateTime. > > >>> > > > > > > > >> - If the threshold is set to 0. The timestamp > in > > >>> the > > >>> > > > message > > >>> > > > > > is > > >>> > > > > > > > >> equivalent to LogAppendTime. > > >>> > > > > > > > >> > > >>> > > > > > > > >> This proposal actually allows user to use either > > >>> CreateTime > > >>> > or > > >>> > > > > > > > >> LogAppendTime without introducing two timestamp > > concept > > >>> at > > >>> > the > > >>> > > > > same > > >>> > > > > > > > time. I > > >>> > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with > this > > >>> > > proposal. > > >>> > > > > > > > >> > > >>> > > > > > > > >> One thing I am thinking is that instead of having a > > time > > >>> > > > > difference > > >>> > > > > > > > >> threshold, should we simply set have a TimestampType > > >>> > > > > configuration? > > >>> > > > > > > > Because > > >>> > > > > > > > >> in most cases, people will either set the threshold > to > > >>> 0 or > > >>> > > > > > > > Long.MaxValue. > > >>> > > > > > > > >> Setting anything in between will make the timestamp > in > > >>> the > > >>> > > > message > > >>> > > > > > > > >> meaningless to user - user don't know if the > timestamp > > >>> has > > >>> > > been > > >>> > > > > > > > overwritten > > >>> > > > > > > > >> by the brokers. > > >>> > > > > > > > >> > > >>> > > > > > > > >> Any thoughts? > > >>> > > > > > > > >> > > >>> > > > > > > > >> Thanks, > > >>> > > > > > > > >> Jiangjie (Becket) Qin > > >>> > > > > > > > >> > > >>> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin < > > >>> > > > j...@linkedin.com> > > >>> > > > > > > > wrote: > > >>> > > > > > > > >> > > >>> > > > > > > > >> > Hi Jay, > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > Thanks for such detailed explanation. I think we > > both > > >>> are > > >>> > > > trying > > >>> > > > > > to > > >>> > > > > > > > make > > >>> > > > > > > > >> > CreateTime work for us if possible. To me by > "work" > > it > > >>> > means > > >>> > > > > clear > > >>> > > > > > > > >> > guarantees on: > > >>> > > > > > > > >> > 1. Log Retention Time enforcement. > > >>> > > > > > > > >> > 2. Log Rolling time enforcement (This might be > less > > a > > >>> > > concern > > >>> > > > as > > >>> > > > > > you > > >>> > > > > > > > >> > pointed out) > > >>> > > > > > > > >> > 3. Application search message by time. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > WRT (1), I agree the expectation for log retention > > >>> might > > >>> > be > > >>> > > > > > > different > > >>> > > > > > > > >> > depending on who we ask. But my concern is about > the > > >>> level > > >>> > > of > > >>> > > > > > > > guarantee > > >>> > > > > > > > >> we > > >>> > > > > > > > >> > give to user. My observation is that a clear > > >>> guarantee to > > >>> > > user > > >>> > > > > is > > >>> > > > > > > > >> critical > > >>> > > > > > > > >> > regardless of the mechanism we choose. And this is > > the > > >>> > > subtle > > >>> > > > > but > > >>> > > > > > > > >> important > > >>> > > > > > > > >> > difference between using LogAppendTime and > > CreateTime. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > Let's say user asks this question: How long will > my > > >>> > message > > >>> > > > stay > > >>> > > > > > in > > >>> > > > > > > > >> Kafka? > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > If we use LogAppendTime for log retention, the > > answer > > >>> is > > >>> > > > message > > >>> > > > > > > will > > >>> > > > > > > > >> stay > > >>> > > > > > > > >> > in Kafka for retention time after the message is > > >>> produced > > >>> > > (to > > >>> > > > be > > >>> > > > > > > more > > >>> > > > > > > > >> > precise, upper bounded by log.rolling.ms + > > >>> > log.retention.ms > > >>> > > ). > > >>> > > > > > User > > >>> > > > > > > > has a > > >>> > > > > > > > >> > clear guarantee and they may decide whether or not > > to > > >>> put > > >>> > > the > > >>> > > > > > > message > > >>> > > > > > > > >> into > > >>> > > > > > > > >> > Kafka. Or how to adjust the retention time > according > > >>> to > > >>> > > their > > >>> > > > > > > > >> requirements. > > >>> > > > > > > > >> > If we use create time for log retention, the > answer > > >>> would > > >>> > be > > >>> > > > it > > >>> > > > > > > > depends. > > >>> > > > > > > > >> > The best answer we can give is at least > > retention.ms > > >>> > > because > > >>> > > > > > there > > >>> > > > > > > > is no > > >>> > > > > > > > >> > guarantee when the messages will be deleted after > > >>> that. > > >>> > If a > > >>> > > > > > message > > >>> > > > > > > > sits > > >>> > > > > > > > >> > somewhere behind a larger create time, the message > > >>> might > > >>> > > stay > > >>> > > > > > longer > > >>> > > > > > > > than > > >>> > > > > > > > >> > expected. But we don't know how longer it would be > > >>> because > > >>> > > it > > >>> > > > > > > depends > > >>> > > > > > > > on > > >>> > > > > > > > >> > the create time. In this case, it is hard for user > > to > > >>> > decide > > >>> > > > > what > > >>> > > > > > to > > >>> > > > > > > > do. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > I am worrying about this because a blurring > > guarantee > > >>> has > > >>> > > > bitten > > >>> > > > > > us > > >>> > > > > > > > >> > before, e.g. Topic creation. We have received many > > >>> > questions > > >>> > > > > like > > >>> > > > > > > > "why my > > >>> > > > > > > > >> > topic is not there after I created it". I can > > imagine > > >>> we > > >>> > > > receive > > >>> > > > > > > > similar > > >>> > > > > > > > >> > question asking "why my message is still there > after > > >>> > > retention > > >>> > > > > > time > > >>> > > > > > > > has > > >>> > > > > > > > >> > reached". So my understanding is that a clear and > > >>> solid > > >>> > > > > guarantee > > >>> > > > > > is > > >>> > > > > > > > >> better > > >>> > > > > > > > >> > than having a mechanism that works in most cases > but > > >>> > > > > occasionally > > >>> > > > > > > does > > >>> > > > > > > > >> not > > >>> > > > > > > > >> > work. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > If we think of the retention guarantee we provide > > with > > >>> > > > > > > LogAppendTime, > > >>> > > > > > > > it > > >>> > > > > > > > >> > is not broken as you said, because we are telling > > >>> user the > > >>> > > log > > >>> > > > > > > > retention > > >>> > > > > > > > >> is > > >>> > > > > > > > >> > NOT based on create time at the first place. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > WRT (3), no matter whether we index on > LogAppendTime > > >>> or > > >>> > > > > > CreateTime, > > >>> > > > > > > > the > > >>> > > > > > > > >> > best guarantee we can provide with user is "not > > >>> missing > > >>> > > > message > > >>> > > > > > > after > > >>> > > > > > > > a > > >>> > > > > > > > >> > certain timestamp". Therefore I actually really > like > > >>> to > > >>> > > index > > >>> > > > on > > >>> > > > > > > > >> CreateTime > > >>> > > > > > > > >> > because that is the timestamp we provide to user, > > and > > >>> we > > >>> > can > > >>> > > > > have > > >>> > > > > > > the > > >>> > > > > > > > >> solid > > >>> > > > > > > > >> > guarantee. > > >>> > > > > > > > >> > On the other hand, indexing on LogAppendTime and > > >>> giving > > >>> > user > > >>> > > > > > > > CreateTime > > >>> > > > > > > > >> > does not provide solid guarantee when user do > search > > >>> based > > >>> > > on > > >>> > > > > > > > timestamp. > > >>> > > > > > > > >> It > > >>> > > > > > > > >> > only works when LogAppendTime is always no earlier > > >>> than > > >>> > > > > > CreateTime. > > >>> > > > > > > > This > > >>> > > > > > > > >> is > > >>> > > > > > > > >> > a reasonable assumption and we can easily enforce > > it. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > With above, I am not sure if we can avoid server > > >>> timestamp > > >>> > > to > > >>> > > > > make > > >>> > > > > > > log > > >>> > > > > > > > >> > retention work with a clear guarantee. For > searching > > >>> by > > >>> > > > > timestamp > > >>> > > > > > > use > > >>> > > > > > > > >> case, > > >>> > > > > > > > >> > I really want to have the index built on > CreateTime. > > >>> But > > >>> > > with > > >>> > > > a > > >>> > > > > > > > >> reasonable > > >>> > > > > > > > >> > assumption and timestamp enforcement, a > > LogAppendTime > > >>> > index > > >>> > > > > would > > >>> > > > > > > also > > >>> > > > > > > > >> work. > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > Thanks, > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > Jiangjie (Becket) Qin > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps < > > >>> > > j...@confluent.io > > >>> > > > > > > >>> > > > > > > wrote: > > >>> > > > > > > > >> > > > >>> > > > > > > > >> >> Hey Becket, > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> Let me see if I can address your concerns: > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> 1. Let's say we have two source clusters that are > > >>> > mirrored > > >>> > > to > > >>> > > > > the > > >>> > > > > > > > same > > >>> > > > > > > > >> >> > target cluster. For some reason one of the > mirror > > >>> maker > > >>> > > > from > > >>> > > > > a > > >>> > > > > > > > cluster > > >>> > > > > > > > >> >> dies > > >>> > > > > > > > >> >> > and after fix the issue we want to resume > > >>> mirroring. In > > >>> > > > this > > >>> > > > > > case > > >>> > > > > > > > it > > >>> > > > > > > > >> is > > >>> > > > > > > > >> >> > possible that when the mirror maker resumes > > >>> mirroring, > > >>> > > the > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> of > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > messages have already gone beyond the > acceptable > > >>> > > timestamp > > >>> > > > > > range > > >>> > > > > > > on > > >>> > > > > > > > >> >> broker. > > >>> > > > > > > > >> >> > In order to let those messages go through, we > > have > > >>> to > > >>> > > bump > > >>> > > > up > > >>> > > > > > the > > >>> > > > > > > > >> >> > *max.append.delay > > >>> > > > > > > > >> >> > *for all the topics on the target broker. This > > >>> could be > > >>> > > > > > painful. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> Actually what I was suggesting was different. > Here > > >>> is my > > >>> > > > > > > observation: > > >>> > > > > > > > >> >> clusters/topics directly produced to by > > applications > > >>> > have a > > >>> > > > > valid > > >>> > > > > > > > >> >> assertion > > >>> > > > > > > > >> >> that log append time and create time are similar > > >>> (let's > > >>> > > call > > >>> > > > > > these > > >>> > > > > > > > >> >> "unbuffered"); other cluster/topic such as those > > that > > >>> > > receive > > >>> > > > > > data > > >>> > > > > > > > from > > >>> > > > > > > > >> a > > >>> > > > > > > > >> >> database, a log file, or another kafka cluster > > don't > > >>> have > > >>> > > > that > > >>> > > > > > > > >> assertion, > > >>> > > > > > > > >> >> for these "buffered" clusters data can be > > arbitrarily > > >>> > late. > > >>> > > > > This > > >>> > > > > > > > means > > >>> > > > > > > > >> any > > >>> > > > > > > > >> >> use of log append time on these buffered clusters > > is > > >>> not > > >>> > > very > > >>> > > > > > > > >> meaningful, > > >>> > > > > > > > >> >> and create time and log append time "should" be > > >>> similar > > >>> > on > > >>> > > > > > > unbuffered > > >>> > > > > > > > >> >> clusters so you can probably use either. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> Using log append time on buffered clusters > actually > > >>> > results > > >>> > > > in > > >>> > > > > > bad > > >>> > > > > > > > >> things. > > >>> > > > > > > > >> >> If you request the offset for a given time you > get > > >>> don't > > >>> > > end > > >>> > > > up > > >>> > > > > > > > getting > > >>> > > > > > > > >> >> data for that time but rather data that showed up > > at > > >>> that > > >>> > > > time. > > >>> > > > > > If > > >>> > > > > > > > you > > >>> > > > > > > > >> try > > >>> > > > > > > > >> >> to retain 7 days of data it may mostly work but > any > > >>> kind > > >>> > of > > >>> > > > > > > > >> bootstrapping > > >>> > > > > > > > >> >> will result in retaining much more (potentially > the > > >>> whole > > >>> > > > > > database > > >>> > > > > > > > >> >> contents!). > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> So what I am suggesting in terms of the use of > the > > >>> > > > > > max.append.delay > > >>> > > > > > > > is > > >>> > > > > > > > >> >> that > > >>> > > > > > > > >> >> unbuffered clusters would have this set and > > buffered > > >>> > > clusters > > >>> > > > > > would > > >>> > > > > > > > not. > > >>> > > > > > > > >> >> In > > >>> > > > > > > > >> >> other words, in LI terminology, tracking and > > metrics > > >>> > > clusters > > >>> > > > > > would > > >>> > > > > > > > have > > >>> > > > > > > > >> >> this enforced, aggregate and replica clusters > > >>> wouldn't. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> So you DO have the issue of potentially > maintaining > > >>> more > > >>> > > data > > >>> > > > > > than > > >>> > > > > > > > you > > >>> > > > > > > > >> >> need > > >>> > > > > > > > >> >> to on aggregate clusters if your mirroring skews, > > >>> but you > > >>> > > > DON'T > > >>> > > > > > > need > > >>> > > > > > > > to > > >>> > > > > > > > >> >> tweak the setting as you described. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> 2. Let's say in the above scenario we let the > > >>> messages > > >>> > in, > > >>> > > at > > >>> > > > > > that > > >>> > > > > > > > point > > >>> > > > > > > > >> >> > some log segments in the target cluster might > > have > > >>> a > > >>> > wide > > >>> > > > > range > > >>> > > > > > > of > > >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log > > rolling > > >>> > could > > >>> > > > be > > >>> > > > > > > tricky > > >>> > > > > > > > >> >> because > > >>> > > > > > > > >> >> > the first time index entry does not necessarily > > >>> have > > >>> > the > > >>> > > > > > smallest > > >>> > > > > > > > >> >> timestamp > > >>> > > > > > > > >> >> > of all the messages in the log segment. > Instead, > > >>> it is > > >>> > > the > > >>> > > > > > > largest > > >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire > > >>> log to > > >>> > > find > > >>> > > > > the > > >>> > > > > > > > >> message > > >>> > > > > > > > >> >> > with smallest offset to see if we should roll. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> I think there are two uses for time-based log > > >>> rolling: > > >>> > > > > > > > >> >> 1. Making the offset lookup by timestamp work > > >>> > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if > it > > >>> is > > >>> > > > supposed > > >>> > > > > > to > > >>> > > > > > > > get > > >>> > > > > > > > >> >> purged after 7 days > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> But think about these two use cases. (1) is > totally > > >>> > > obviated > > >>> > > > by > > >>> > > > > > the > > >>> > > > > > > > >> >> time=>offset index we are adding which yields > much > > >>> more > > >>> > > > > granular > > >>> > > > > > > > offset > > >>> > > > > > > > >> >> lookups. (2) Is actually totally broken if you > > >>> switch to > > >>> > > > append > > >>> > > > > > > time, > > >>> > > > > > > > >> >> right? If you want to be sure for > security/privacy > > >>> > reasons > > >>> > > > you > > >>> > > > > > only > > >>> > > > > > > > >> retain > > >>> > > > > > > > >> >> 7 days of data then if the log append and create > > time > > >>> > > diverge > > >>> > > > > you > > >>> > > > > > > > >> actually > > >>> > > > > > > > >> >> violate this requirement. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> I think 95% of people care about (1) which is > > solved > > >>> in > > >>> > the > > >>> > > > > > > proposal > > >>> > > > > > > > and > > >>> > > > > > > > >> >> (2) is actually broken today as well as in both > > >>> > proposals. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> 3. Theoretically it is possible that an older log > > >>> segment > > >>> > > > > > contains > > >>> > > > > > > > >> >> > timestamps that are older than all the messages > > in > > >>> a > > >>> > > newer > > >>> > > > > log > > >>> > > > > > > > >> segment. > > >>> > > > > > > > >> >> It > > >>> > > > > > > > >> >> > would be weird that we are supposed to delete > the > > >>> newer > > >>> > > log > > >>> > > > > > > segment > > >>> > > > > > > > >> >> before > > >>> > > > > > > > >> >> > we delete the older log segment. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> The index timestamps would always be a lower > bound > > >>> (i.e. > > >>> > > the > > >>> > > > > > > maximum > > >>> > > > > > > > at > > >>> > > > > > > > >> >> that time) so I don't think that is possible. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> 4. In bootstrap case, if we reload the data to a > > >>> Kafka > > >>> > > > > cluster, > > >>> > > > > > we > > >>> > > > > > > > have > > >>> > > > > > > > >> >> to > > >>> > > > > > > > >> >> > make sure we configure the topic correctly > before > > >>> we > > >>> > load > > >>> > > > the > > >>> > > > > > > data. > > >>> > > > > > > > >> >> > Otherwise the message might either be rejected > > >>> because > > >>> > > the > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> is > > >>> > > > > > > > >> >> too > > >>> > > > > > > > >> >> > old, or it might be deleted immediately because > > the > > >>> > > > retention > > >>> > > > > > > time > > >>> > > > > > > > has > > >>> > > > > > > > >> >> > reached. > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> See (1). > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> -Jay > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin > > >>> > > > > > > > <j...@linkedin.com.invalid > > >>> > > > > > > > >> > > > >>> > > > > > > > >> >> wrote: > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> >> > Hey Jay and Guozhang, > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > Thanks a lot for the reply. So if I understand > > >>> > correctly, > > >>> > > > > Jay's > > >>> > > > > > > > >> proposal > > >>> > > > > > > > >> >> > is: > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > 1. Let client stamp the message create time. > > >>> > > > > > > > >> >> > 2. Broker build index based on client-stamped > > >>> message > > >>> > > > create > > >>> > > > > > > time. > > >>> > > > > > > > >> >> > 3. Broker only takes message whose create time > is > > >>> > withing > > >>> > > > > > current > > >>> > > > > > > > time > > >>> > > > > > > > >> >> > plus/minus T (T is a configuration > > >>> *max.append.delay*, > > >>> > > > could > > >>> > > > > be > > >>> > > > > > > > topic > > >>> > > > > > > > >> >> level > > >>> > > > > > > > >> >> > configuration), if the timestamp is out of this > > >>> range, > > >>> > > > broker > > >>> > > > > > > > rejects > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > message. > > >>> > > > > > > > >> >> > 4. Because the create time of messages can be > out > > >>> of > > >>> > > order, > > >>> > > > > > when > > >>> > > > > > > > >> broker > > >>> > > > > > > > >> >> > builds the time based index it only provides > the > > >>> > > guarantee > > >>> > > > > that > > >>> > > > > > > if > > >>> > > > > > > > a > > >>> > > > > > > > >> >> > consumer starts consuming from the offset > > returned > > >>> by > > >>> > > > > searching > > >>> > > > > > > by > > >>> > > > > > > > >> >> > timestamp t, they will not miss any message > > created > > >>> > after > > >>> > > > t, > > >>> > > > > > but > > >>> > > > > > > > might > > >>> > > > > > > > >> >> see > > >>> > > > > > > > >> >> > some messages created before t. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > To build the time based index, every time when > a > > >>> broker > > >>> > > > needs > > >>> > > > > > to > > >>> > > > > > > > >> insert > > >>> > > > > > > > >> >> a > > >>> > > > > > > > >> >> > new time index entry, the entry would be > > >>> > > > > > > > {Largest_Timestamp_Ever_Seen > > >>> > > > > > > > >> -> > > >>> > > > > > > > >> >> > Current_Offset}. This basically means any > > timestamp > > >>> > > larger > > >>> > > > > than > > >>> > > > > > > the > > >>> > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after > this > > >>> offset > > >>> > > > > because > > >>> > > > > > > it > > >>> > > > > > > > >> never > > >>> > > > > > > > >> >> > saw them before. So we don't miss any message > > with > > >>> > larger > > >>> > > > > > > > timestamp. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > (@Guozhang, in this case, for log retention we > > only > > >>> > need > > >>> > > to > > >>> > > > > > take > > >>> > > > > > > a > > >>> > > > > > > > >> look > > >>> > > > > > > > >> >> at > > >>> > > > > > > > >> >> > the last time index entry, because it must be > the > > >>> > largest > > >>> > > > > > > timestamp > > >>> > > > > > > > >> >> ever, > > >>> > > > > > > > >> >> > if that timestamp is overdue, we can safely > > delete > > >>> any > > >>> > > log > > >>> > > > > > > segment > > >>> > > > > > > > >> >> before > > >>> > > > > > > > >> >> > that. So we don't need to scan the log segment > > >>> file for > > >>> > > log > > >>> > > > > > > > retention) > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > I assume that we are still going to have the > new > > >>> > > > FetchRequest > > >>> > > > > > to > > >>> > > > > > > > allow > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > time index replication for replicas. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > I think Jay's main point here is that we don't > > >>> want to > > >>> > > have > > >>> > > > > two > > >>> > > > > > > > >> >> timestamp > > >>> > > > > > > > >> >> > concepts in Kafka, which I agree is a > reasonable > > >>> > concern. > > >>> > > > > And I > > >>> > > > > > > > also > > >>> > > > > > > > >> >> agree > > >>> > > > > > > > >> >> > that create time is more meaningful than > > >>> LogAppendTime > > >>> > > for > > >>> > > > > > users. > > >>> > > > > > > > But > > >>> > > > > > > > >> I > > >>> > > > > > > > >> >> am > > >>> > > > > > > > >> >> > not sure if making everything base on Create > Time > > >>> would > > >>> > > > work > > >>> > > > > in > > >>> > > > > > > all > > >>> > > > > > > > >> >> cases. > > >>> > > > > > > > >> >> > Here are my questions about this approach: > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > 1. Let's say we have two source clusters that > are > > >>> > > mirrored > > >>> > > > to > > >>> > > > > > the > > >>> > > > > > > > same > > >>> > > > > > > > >> >> > target cluster. For some reason one of the > mirror > > >>> maker > > >>> > > > from > > >>> > > > > a > > >>> > > > > > > > cluster > > >>> > > > > > > > >> >> dies > > >>> > > > > > > > >> >> > and after fix the issue we want to resume > > >>> mirroring. In > > >>> > > > this > > >>> > > > > > case > > >>> > > > > > > > it > > >>> > > > > > > > >> is > > >>> > > > > > > > >> >> > possible that when the mirror maker resumes > > >>> mirroring, > > >>> > > the > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> of > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > messages have already gone beyond the > acceptable > > >>> > > timestamp > > >>> > > > > > range > > >>> > > > > > > on > > >>> > > > > > > > >> >> broker. > > >>> > > > > > > > >> >> > In order to let those messages go through, we > > have > > >>> to > > >>> > > bump > > >>> > > > up > > >>> > > > > > the > > >>> > > > > > > > >> >> > *max.append.delay > > >>> > > > > > > > >> >> > *for all the topics on the target broker. This > > >>> could be > > >>> > > > > > painful. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > 2. Let's say in the above scenario we let the > > >>> messages > > >>> > > in, > > >>> > > > at > > >>> > > > > > > that > > >>> > > > > > > > >> point > > >>> > > > > > > > >> >> > some log segments in the target cluster might > > have > > >>> a > > >>> > wide > > >>> > > > > range > > >>> > > > > > > of > > >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log > > rolling > > >>> > could > > >>> > > > be > > >>> > > > > > > tricky > > >>> > > > > > > > >> >> because > > >>> > > > > > > > >> >> > the first time index entry does not necessarily > > >>> have > > >>> > the > > >>> > > > > > smallest > > >>> > > > > > > > >> >> timestamp > > >>> > > > > > > > >> >> > of all the messages in the log segment. > Instead, > > >>> it is > > >>> > > the > > >>> > > > > > > largest > > >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire > > >>> log to > > >>> > > find > > >>> > > > > the > > >>> > > > > > > > >> message > > >>> > > > > > > > >> >> > with smallest offset to see if we should roll. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > 3. Theoretically it is possible that an older > log > > >>> > segment > > >>> > > > > > > contains > > >>> > > > > > > > >> >> > timestamps that are older than all the messages > > in > > >>> a > > >>> > > newer > > >>> > > > > log > > >>> > > > > > > > >> segment. > > >>> > > > > > > > >> >> It > > >>> > > > > > > > >> >> > would be weird that we are supposed to delete > the > > >>> newer > > >>> > > log > > >>> > > > > > > segment > > >>> > > > > > > > >> >> before > > >>> > > > > > > > >> >> > we delete the older log segment. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to > a > > >>> Kafka > > >>> > > > > cluster, > > >>> > > > > > > we > > >>> > > > > > > > >> have > > >>> > > > > > > > >> >> to > > >>> > > > > > > > >> >> > make sure we configure the topic correctly > before > > >>> we > > >>> > load > > >>> > > > the > > >>> > > > > > > data. > > >>> > > > > > > > >> >> > Otherwise the message might either be rejected > > >>> because > > >>> > > the > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> is > > >>> > > > > > > > >> >> too > > >>> > > > > > > > >> >> > old, or it might be deleted immediately because > > the > > >>> > > > retention > > >>> > > > > > > time > > >>> > > > > > > > has > > >>> > > > > > > > >> >> > reached. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > I am very concerned about the operational > > overhead > > >>> and > > >>> > > the > > >>> > > > > > > > ambiguity > > >>> > > > > > > > >> of > > >>> > > > > > > > >> >> > guarantees we introduce if we purely rely on > > >>> > CreateTime. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > It looks to me that the biggest issue of > adopting > > >>> > > > CreateTime > > >>> > > > > > > > >> everywhere > > >>> > > > > > > > >> >> is > > >>> > > > > > > > >> >> > CreateTime can have big gaps. These gaps could > be > > >>> > caused > > >>> > > by > > >>> > > > > > > several > > >>> > > > > > > > >> >> cases: > > >>> > > > > > > > >> >> > [1]. Faulty clients > > >>> > > > > > > > >> >> > [2]. Natural delays from different source > > >>> > > > > > > > >> >> > [3]. Bootstrap > > >>> > > > > > > > >> >> > [4]. Failure recovery > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps > > >>> solve > > >>> > [2] > > >>> > > as > > >>> > > > > > well > > >>> > > > > > > > if we > > >>> > > > > > > > >> >> are > > >>> > > > > > > > >> >> > able to set a reasonable max.append.delay. But > it > > >>> does > > >>> > > not > > >>> > > > > seem > > >>> > > > > > > > >> address > > >>> > > > > > > > >> >> [3] > > >>> > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are > > >>> solvable > > >>> > > > because > > >>> > > > > > it > > >>> > > > > > > > looks > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > CreateTime gap is unavoidable in those two > cases. > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > Thanks, > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > Jiangjie (Becket) Qin > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang > < > > >>> > > > > > > wangg...@gmail.com > > >>> > > > > > > > > > > >>> > > > > > > > >> >> wrote: > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > Just to complete Jay's option, here is my > > >>> > > understanding: > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > 1. For log retention: if we want to remove > data > > >>> > before > > >>> > > > time > > >>> > > > > > t, > > >>> > > > > > > we > > >>> > > > > > > > >> look > > >>> > > > > > > > >> >> > into > > >>> > > > > > > > >> >> > > the index file of each segment and find the > > >>> largest > > >>> > > > > timestamp > > >>> > > > > > > t' > > >>> > > > > > > > < > > >>> > > > > > > > >> t, > > >>> > > > > > > > >> >> > find > > >>> > > > > > > > >> >> > > the corresponding timestamp and start > scanning > > >>> to the > > >>> > > end > > >>> > > > > of > > >>> > > > > > > the > > >>> > > > > > > > >> >> segment, > > >>> > > > > > > > >> >> > > if there is no entry with timestamp >= t, we > > can > > >>> > delete > > >>> > > > > this > > >>> > > > > > > > >> segment; > > >>> > > > > > > > >> >> if > > >>> > > > > > > > >> >> > a > > >>> > > > > > > > >> >> > > segment's index smallest timestamp is larger > > >>> than t, > > >>> > we > > >>> > > > can > > >>> > > > > > > skip > > >>> > > > > > > > >> that > > >>> > > > > > > > >> >> > > segment. > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > 2. For log rolling: if we want to start a new > > >>> segment > > >>> > > > after > > >>> > > > > > > time > > >>> > > > > > > > t, > > >>> > > > > > > > >> we > > >>> > > > > > > > >> >> > look > > >>> > > > > > > > >> >> > > into the active segment's index file, if the > > >>> largest > > >>> > > > > > timestamp > > >>> > > > > > > is > > >>> > > > > > > > >> >> > already > > > >>> > > > > > > > >> >> > > t, we can roll a new segment immediately; if > it > > >>> is < > > >>> > t, > > >>> > > > we > > >>> > > > > > read > > >>> > > > > > > > its > > >>> > > > > > > > >> >> > > corresponding offset and start scanning to > the > > >>> end of > > >>> > > the > > >>> > > > > > > > segment, > > >>> > > > > > > > >> if > > >>> > > > > > > > >> >> we > > >>> > > > > > > > >> >> > > find a record whose timestamp > t, we can > roll > > a > > >>> new > > >>> > > > > segment. > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > For log rolling we only need to possibly > scan a > > >>> small > > >>> > > > > portion > > >>> > > > > > > the > > >>> > > > > > > > >> >> active > > >>> > > > > > > > >> >> > > segment, which should be fine; for log > > retention > > >>> we > > >>> > may > > >>> > > > in > > >>> > > > > > the > > >>> > > > > > > > worst > > >>> > > > > > > > >> >> case > > >>> > > > > > > > >> >> > > end up scanning all segments, but in practice > > we > > >>> may > > >>> > > skip > > >>> > > > > > most > > >>> > > > > > > of > > >>> > > > > > > > >> them > > >>> > > > > > > > >> >> > > since their smallest timestamp in the index > > file > > >>> is > > >>> > > > larger > > >>> > > > > > than > > >>> > > > > > > > t. > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > Guozhang > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps < > > >>> > > > > > j...@confluent.io> > > >>> > > > > > > > >> wrote: > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > I think it should be possible to index > > >>> out-of-order > > >>> > > > > > > timestamps. > > >>> > > > > > > > >> The > > >>> > > > > > > > >> >> > > > timestamp index would be similar to the > > offset > > >>> > > index, a > > >>> > > > > > > memory > > >>> > > > > > > > >> >> mapped > > >>> > > > > > > > >> >> > > file > > >>> > > > > > > > >> >> > > > appended to as part of the log append, but > > >>> would > > >>> > have > > >>> > > > the > > >>> > > > > > > > format > > >>> > > > > > > > >> >> > > > timestamp offset > > >>> > > > > > > > >> >> > > > The timestamp entries would be monotonic > and > > as > > >>> > with > > >>> > > > the > > >>> > > > > > > offset > > >>> > > > > > > > >> >> index > > >>> > > > > > > > >> >> > > would > > >>> > > > > > > > >> >> > > > be no more often then every 4k (or some > > >>> > configurable > > >>> > > > > > > threshold > > >>> > > > > > > > to > > >>> > > > > > > > >> >> keep > > >>> > > > > > > > >> >> > > the > > >>> > > > > > > > >> >> > > > index small--actually for timestamp it > could > > >>> > probably > > >>> > > > be > > >>> > > > > > much > > >>> > > > > > > > more > > >>> > > > > > > > >> >> > sparse > > >>> > > > > > > > >> >> > > > than 4k). > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > A search for a timestamp t yields an > offset o > > >>> > before > > >>> > > > > which > > >>> > > > > > no > > >>> > > > > > > > >> prior > > >>> > > > > > > > >> >> > > message > > >>> > > > > > > > >> >> > > > has a timestamp >= t. In other words if you > > >>> read > > >>> > the > > >>> > > > log > > >>> > > > > > > > starting > > >>> > > > > > > > >> >> with > > >>> > > > > > > > >> >> > o > > >>> > > > > > > > >> >> > > > you are guaranteed not to miss any messages > > >>> > occurring > > >>> > > > at > > >>> > > > > t > > >>> > > > > > or > > >>> > > > > > > > >> later > > >>> > > > > > > > >> >> > > though > > >>> > > > > > > > >> >> > > > you may get many before t (due to > > >>> > out-of-orderness). > > >>> > > > > Unlike > > >>> > > > > > > the > > >>> > > > > > > > >> >> offset > > >>> > > > > > > > >> >> > > > index this bound doesn't really have to be > > >>> tight > > >>> > > (i.e. > > >>> > > > > > > > probably no > > >>> > > > > > > > >> >> need > > >>> > > > > > > > >> >> > > to > > >>> > > > > > > > >> >> > > > go search the log itself, though you > could). > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > -Jay > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay > Kreps < > > >>> > > > > > > j...@confluent.io> > > >>> > > > > > > > >> >> wrote: > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > > Here's my basic take: > > >>> > > > > > > > >> >> > > > > - I agree it would be nice to have a > notion > > >>> of > > >>> > time > > >>> > > > > baked > > >>> > > > > > > in > > >>> > > > > > > > if > > >>> > > > > > > > >> it > > >>> > > > > > > > >> >> > were > > >>> > > > > > > > >> >> > > > > done right > > >>> > > > > > > > >> >> > > > > - All the proposals so far seem pretty > > >>> complex--I > > >>> > > > think > > >>> > > > > > > they > > >>> > > > > > > > >> might > > >>> > > > > > > > >> >> > make > > >>> > > > > > > > >> >> > > > > things worse rather than better overall > > >>> > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to > the > > >>> > message > > >>> > > > is > > >>> > > > > > > > probably > > >>> > > > > > > > >> a > > >>> > > > > > > > >> >> > > > > non-starter from a size perspective > > >>> > > > > > > > >> >> > > > > - Even if it isn't in the message, having > > two > > >>> > > notions > > >>> > > > > of > > >>> > > > > > > time > > >>> > > > > > > > >> that > > >>> > > > > > > > >> >> > > > control > > >>> > > > > > > > >> >> > > > > different things is a bit confusing > > >>> > > > > > > > >> >> > > > > - The mechanics of basing retention etc > on > > >>> log > > >>> > > append > > >>> > > > > > time > > >>> > > > > > > > when > > >>> > > > > > > > >> >> > that's > > >>> > > > > > > > >> >> > > > not > > >>> > > > > > > > >> >> > > > > in the log seem complicated > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > To that end here is a possible 4th > option. > > >>> Let me > > >>> > > > know > > >>> > > > > > what > > >>> > > > > > > > you > > >>> > > > > > > > >> >> > think. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > The basic idea is that the message > creation > > >>> time > > >>> > is > > >>> > > > > > closest > > >>> > > > > > > > to > > >>> > > > > > > > >> >> what > > >>> > > > > > > > >> >> > the > > >>> > > > > > > > >> >> > > > > user actually cares about but is > dangerous > > >>> if set > > >>> > > > > wrong. > > >>> > > > > > So > > >>> > > > > > > > >> rather > > >>> > > > > > > > >> >> > than > > >>> > > > > > > > >> >> > > > > substitute another notion of time, let's > > try > > >>> to > > >>> > > > ensure > > >>> > > > > > the > > >>> > > > > > > > >> >> > correctness > > >>> > > > > > > > >> >> > > of > > >>> > > > > > > > >> >> > > > > message creation time by preventing > > >>> arbitrarily > > >>> > bad > > >>> > > > > > message > > >>> > > > > > > > >> >> creation > > >>> > > > > > > > >> >> > > > times. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > First, let's see if we can agree that log > > >>> append > > >>> > > time > > >>> > > > > is > > >>> > > > > > > not > > >>> > > > > > > > >> >> > something > > >>> > > > > > > > >> >> > > > > anyone really cares about but rather an > > >>> > > > implementation > > >>> > > > > > > > detail. > > >>> > > > > > > > >> The > > >>> > > > > > > > >> >> > > > > timestamp that matters to the user is > when > > >>> the > > >>> > > > message > > >>> > > > > > > > occurred > > >>> > > > > > > > >> >> (the > > >>> > > > > > > > >> >> > > > > creation time). The log append time is > > >>> basically > > >>> > > just > > >>> > > > > an > > >>> > > > > > > > >> >> > approximation > > >>> > > > > > > > >> >> > > to > > >>> > > > > > > > >> >> > > > > this on the assumption that the message > > >>> creation > > >>> > > and > > >>> > > > > the > > >>> > > > > > > > message > > >>> > > > > > > > >> >> > > receive > > >>> > > > > > > > >> >> > > > on > > >>> > > > > > > > >> >> > > > > the server occur pretty close together > and > > >>> the > > >>> > > reason > > >>> > > > > to > > >>> > > > > > > > prefer > > >>> > > > > > > > >> . > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > But as these values diverge the issue > > starts > > >>> to > > >>> > > > become > > >>> > > > > > > > apparent. > > >>> > > > > > > > >> >> Say > > >>> > > > > > > > >> >> > > you > > >>> > > > > > > > >> >> > > > > set the retention to one week and then > > mirror > > >>> > data > > >>> > > > > from a > > >>> > > > > > > > topic > > >>> > > > > > > > >> >> > > > containing > > >>> > > > > > > > >> >> > > > > two years of retention. Your intention is > > >>> clearly > > >>> > > to > > >>> > > > > keep > > >>> > > > > > > the > > >>> > > > > > > > >> last > > >>> > > > > > > > >> >> > > week, > > >>> > > > > > > > >> >> > > > > but because the mirroring is appending > > right > > >>> now > > >>> > > you > > >>> > > > > will > > >>> > > > > > > > keep > > >>> > > > > > > > >> two > > >>> > > > > > > > >> >> > > years. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > The reason we are liking log append time > is > > >>> > because > > >>> > > > we > > >>> > > > > > are > > >>> > > > > > > > >> >> > > (justifiably) > > >>> > > > > > > > >> >> > > > > concerned that in certain situations the > > >>> creation > > >>> > > > time > > >>> > > > > > may > > >>> > > > > > > > not > > >>> > > > > > > > >> be > > >>> > > > > > > > >> >> > > > > trustworthy. This same problem exists on > > the > > >>> > > servers > > >>> > > > > but > > >>> > > > > > > > there > > >>> > > > > > > > >> are > > >>> > > > > > > > >> >> > > fewer > > >>> > > > > > > > >> >> > > > > servers and they just run the kafka code > so > > >>> it is > > >>> > > > less > > >>> > > > > of > > >>> > > > > > > an > > >>> > > > > > > > >> >> issue. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > There are two possible ways to handle > this: > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > 1. Just tell people to add size based > > >>> > > retention. I > > >>> > > > > > think > > >>> > > > > > > > this > > >>> > > > > > > > >> >> is > > >>> > > > > > > > >> >> > not > > >>> > > > > > > > >> >> > > > > entirely unreasonable, we're basically > > >>> saying > > >>> > we > > >>> > > > > > retain > > >>> > > > > > > > data > > >>> > > > > > > > >> >> based > > >>> > > > > > > > >> >> > > on > > >>> > > > > > > > >> >> > > > the > > >>> > > > > > > > >> >> > > > > timestamp you give us in the data. If > > you > > >>> give > > >>> > > us > > >>> > > > > bad > > >>> > > > > > > > data we > > >>> > > > > > > > >> >> will > > >>> > > > > > > > >> >> > > > retain > > >>> > > > > > > > >> >> > > > > it for a bad amount of time. If you > want > > >>> to > > >>> > > ensure > > >>> > > > > we > > >>> > > > > > > > don't > > >>> > > > > > > > >> >> retain > > >>> > > > > > > > >> >> > > > "too > > >>> > > > > > > > >> >> > > > > much" data, define "too much" by > > setting a > > >>> > > > > time-based > > >>> > > > > > > > >> retention > > >>> > > > > > > > >> >> > > > setting. > > >>> > > > > > > > >> >> > > > > This is not entirely unreasonable but > > >>> kind of > > >>> > > > > suffers > > >>> > > > > > > > from a > > >>> > > > > > > > >> >> "one > > >>> > > > > > > > >> >> > > bad > > >>> > > > > > > > >> >> > > > > apple" problem in a very large > > >>> environment. > > >>> > > > > > > > >> >> > > > > 2. Prevent bad timestamps. In general > we > > >>> can't > > >>> > > > say a > > >>> > > > > > > > >> timestamp > > >>> > > > > > > > >> >> is > > >>> > > > > > > > >> >> > > bad. > > >>> > > > > > > > >> >> > > > > However the definition we're > implicitly > > >>> using > > >>> > is > > >>> > > > > that > > >>> > > > > > we > > >>> > > > > > > > >> think > > >>> > > > > > > > >> >> > there > > >>> > > > > > > > >> >> > > > are a > > >>> > > > > > > > >> >> > > > > set of topics/clusters where the > > creation > > >>> > > > timestamp > > >>> > > > > > > should > > >>> > > > > > > > >> >> always > > >>> > > > > > > > >> >> > be > > >>> > > > > > > > >> >> > > > "very > > >>> > > > > > > > >> >> > > > > close" to the log append timestamp. > This > > >>> is > > >>> > true > > >>> > > > for > > >>> > > > > > > data > > >>> > > > > > > > >> >> sources > > >>> > > > > > > > >> >> > > > that have > > >>> > > > > > > > >> >> > > > > no buffering capability (which at > > >>> LinkedIn is > > >>> > > very > > >>> > > > > > > common, > > >>> > > > > > > > >> but > > >>> > > > > > > > >> >> is > > >>> > > > > > > > >> >> > > > more rare > > >>> > > > > > > > >> >> > > > > elsewhere). The solution in this case > > >>> would be > > >>> > > to > > >>> > > > > > allow > > >>> > > > > > > a > > >>> > > > > > > > >> >> setting > > >>> > > > > > > > >> >> > > > along the > > >>> > > > > > > > >> >> > > > > lines of max.append.delay which checks > > the > > >>> > > > creation > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> >> > > against > > >>> > > > > > > > >> >> > > > the > > >>> > > > > > > > >> >> > > > > server time to look for too large a > > >>> > divergence. > > >>> > > > The > > >>> > > > > > > > solution > > >>> > > > > > > > >> >> would > > >>> > > > > > > > >> >> > > > either > > >>> > > > > > > > >> >> > > > > be to reject the message or to > override > > it > > >>> > with > > >>> > > > the > > >>> > > > > > > server > > >>> > > > > > > > >> >> time. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > So in LI's environment you would > configure > > >>> the > > >>> > > > clusters > > >>> > > > > > > used > > >>> > > > > > > > for > > >>> > > > > > > > >> >> > > direct, > > >>> > > > > > > > >> >> > > > > unbuffered, message production (e.g. > > >>> tracking and > > >>> > > > > metrics > > >>> > > > > > > > local) > > >>> > > > > > > > >> >> to > > >>> > > > > > > > >> >> > > > enforce > > >>> > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound > > (say > > >>> 10 > > >>> > > > mins), > > >>> > > > > > and > > >>> > > > > > > > all > > >>> > > > > > > > >> >> other > > >>> > > > > > > > >> >> > > > > clusters would just inherent these. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > The downside of this approach is > requiring > > >>> the > > >>> > > > special > > >>> > > > > > > > >> >> configuration. > > >>> > > > > > > > >> >> > > > > However I think in 99% of environments > this > > >>> could > > >>> > > be > > >>> > > > > > > skipped > > >>> > > > > > > > >> >> > entirely, > > >>> > > > > > > > >> >> > > > it's > > >>> > > > > > > > >> >> > > > > only when the ratio of clients to servers > > >>> gets so > > >>> > > > > massive > > >>> > > > > > > > that > > >>> > > > > > > > >> you > > >>> > > > > > > > >> >> > need > > >>> > > > > > > > >> >> > > > to > > >>> > > > > > > > >> >> > > > > do this. The primary upside is that you > > have > > >>> a > > >>> > > single > > >>> > > > > > > > >> >> authoritative > > >>> > > > > > > > >> >> > > > notion > > >>> > > > > > > > >> >> > > > > of time which is closest to what a user > > would > > >>> > want > > >>> > > > and > > >>> > > > > is > > >>> > > > > > > > stored > > >>> > > > > > > > >> >> > > directly > > >>> > > > > > > > >> >> > > > > in the message. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > I'm also assuming there is a workable > > >>> approach > > >>> > for > > >>> > > > > > indexing > > >>> > > > > > > > >> >> > > non-monotonic > > >>> > > > > > > > >> >> > > > > timestamps, though I haven't actually > > worked > > >>> that > > >>> > > > out. > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > -Jay > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie > > Qin > > >>> > > > > > > > >> >> > <j...@linkedin.com.invalid > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > > wrote: > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > >> Bumping up this thread although most of > > the > > >>> > > > discussion > > >>> > > > > > > were > > >>> > > > > > > > on > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > > > >> discussion thread of KIP-31 :) > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> I just updated the KIP page to add > > detailed > > >>> > > solution > > >>> > > > > for > > >>> > > > > > > the > > >>> > > > > > > > >> >> option > > >>> > > > > > > > >> >> > > > >> (Option > > >>> > > > > > > > >> >> > > > >> 3) that does not expose the > LogAppendTime > > to > > >>> > user. > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> The option has a minor change to the > fetch > > >>> > request > > >>> > > > to > > >>> > > > > > > allow > > >>> > > > > > > > >> >> fetching > > >>> > > > > > > > >> >> > > > time > > >>> > > > > > > > >> >> > > > >> index entry as well. I kind of like this > > >>> > solution > > >>> > > > > > because > > >>> > > > > > > > its > > >>> > > > > > > > >> >> just > > >>> > > > > > > > >> >> > > doing > > >>> > > > > > > > >> >> > > > >> what we need without introducing other > > >>> things. > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> It will be great to see what are the > > >>> feedback. I > > >>> > > can > > >>> > > > > > > explain > > >>> > > > > > > > >> more > > >>> > > > > > > > >> >> > > during > > >>> > > > > > > > >> >> > > > >> tomorrow's KIP hangout. > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> Thanks, > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, > Jiangjie > > >>> Qin < > > >>> > > > > > > > >> j...@linkedin.com > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > > wrote: > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > >> > Hi Jay, > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback > on > > >>> the > > >>> > > > > timestamp > > >>> > > > > > > > >> proposal > > >>> > > > > > > > >> >> > that > > >>> > > > > > > > >> >> > > > was > > >>> > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31. > > >>> Please see > > >>> > > the > > >>> > > > > > > replies > > >>> > > > > > > > >> >> inline. > > >>> > > > > > > > >> >> > > > >> > The main change I made compared with > > >>> previous > > >>> > > > > proposal > > >>> > > > > > > is > > >>> > > > > > > > to > > >>> > > > > > > > >> >> add > > >>> > > > > > > > >> >> > > both > > >>> > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the > > >>> message. > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay > > >>> Kreps < > > >>> > > > > > > > j...@confluent.io > > >>> > > > > > > > >> > > > >>> > > > > > > > >> >> > > wrote: > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > > Hey Beckett, > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP > > >>> just > > >>> > for > > >>> > > > > > > > simplicity of > > >>> > > > > > > > >> >> > > > >> discussion. > > >>> > > > > > > > >> >> > > > >> > You > > >>> > > > > > > > >> >> > > > >> > > can still implement them in one > > patch. I > > >>> > think > > >>> > > > > > > > otherwise it > > >>> > > > > > > > >> >> will > > >>> > > > > > > > >> >> > > be > > >>> > > > > > > > >> >> > > > >> hard > > >>> > > > > > > > >> >> > > > >> > to > > >>> > > > > > > > >> >> > > > >> > > discuss/vote on them since if you > like > > >>> the > > >>> > > > offset > > >>> > > > > > > > proposal > > >>> > > > > > > > >> >> but > > >>> > > > > > > > >> >> > not > > >>> > > > > > > > >> >> > > > the > > >>> > > > > > > > >> >> > > > >> > time > > >>> > > > > > > > >> >> > > > >> > > proposal what do you do? > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > Introducing a second notion of time > > into > > >>> > Kafka > > >>> > > > is > > >>> > > > > a > > >>> > > > > > > > pretty > > >>> > > > > > > > >> >> > massive > > >>> > > > > > > > >> >> > > > >> > > philosophical change so it kind of > > >>> warrants > > >>> > > it's > > >>> > > > > own > > >>> > > > > > > > KIP I > > >>> > > > > > > > >> >> think > > >>> > > > > > > > >> >> > > it > > >>> > > > > > > > >> >> > > > >> > isn't > > >>> > > > > > > > >> >> > > > >> > > just "Change message format". > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > WRT time I think one thing to > clarify > > >>> in the > > >>> > > > > > proposal > > >>> > > > > > > is > > >>> > > > > > > > >> how > > >>> > > > > > > > >> >> MM > > >>> > > > > > > > >> >> > > will > > >>> > > > > > > > >> >> > > > >> have > > >>> > > > > > > > >> >> > > > >> > > access to set the timestamp? > > Presumably > > >>> this > > >>> > > > will > > >>> > > > > > be a > > >>> > > > > > > > new > > >>> > > > > > > > >> >> field > > >>> > > > > > > > >> >> > > in > > >>> > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then > any > > >>> user > > >>> > can > > >>> > > > set > > >>> > > > > > the > > >>> > > > > > > > >> >> > timestamp, > > >>> > > > > > > > >> >> > > > >> right? > > >>> > > > > > > > >> >> > > > >> > > I'm not sure you answered the > > questions > > >>> > around > > >>> > > > how > > >>> > > > > > > this > > >>> > > > > > > > >> will > > >>> > > > > > > > >> >> > work > > >>> > > > > > > > >> >> > > > for > > >>> > > > > > > > >> >> > > > >> MM > > >>> > > > > > > > >> >> > > > >> > > since when MM retains timestamps > from > > >>> > multiple > > >>> > > > > > > > partitions > > >>> > > > > > > > >> >> they > > >>> > > > > > > > >> >> > > will > > >>> > > > > > > > >> >> > > > >> then > > >>> > > > > > > > >> >> > > > >> > be > > >>> > > > > > > > >> >> > > > >> > > out of order and in the past (so the > > >>> > > > > > > > >> >> max(lastAppendedTimestamp, > > >>> > > > > > > > >> >> > > > >> > > currentTimeMillis) override you > > proposed > > >>> > will > > >>> > > > not > > >>> > > > > > > work, > > >>> > > > > > > > >> >> right?). > > >>> > > > > > > > >> >> > > If > > >>> > > > > > > > >> >> > > > we > > >>> > > > > > > > >> >> > > > >> > > don't do this then when you set up > > >>> mirroring > > >>> > > the > > >>> > > > > > data > > >>> > > > > > > > will > > >>> > > > > > > > >> >> all > > >>> > > > > > > > >> >> > be > > >>> > > > > > > > >> >> > > > new > > >>> > > > > > > > >> >> > > > >> and > > >>> > > > > > > > >> >> > > > >> > > you have the same retention problem > > you > > >>> > > > described. > > >>> > > > > > > > Maybe I > > >>> > > > > > > > >> >> > missed > > >>> > > > > > > > >> >> > > > >> > > something...? > > >>> > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the > > timestamp > > >>> of > > >>> > the > > >>> > > > > > message > > >>> > > > > > > > that > > >>> > > > > > > > >> >> last > > >>> > > > > > > > >> >> > > > >> > appended to the log. > > >>> > > > > > > > >> >> > > > >> > If a broker is a leader, since it will > > >>> assign > > >>> > > the > > >>> > > > > > > > timestamp > > >>> > > > > > > > >> by > > >>> > > > > > > > >> >> > > itself, > > >>> > > > > > > > >> >> > > > >> the > > >>> > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local > > >>> clock > > >>> > > when > > >>> > > > > > append > > >>> > > > > > > > the > > >>> > > > > > > > >> >> last > > >>> > > > > > > > >> >> > > > >> message. > > >>> > > > > > > > >> >> > > > >> > So if there is no leader migration, > > >>> > > > > > > > >> max(lastAppendedTimestamp, > > >>> > > > > > > > >> >> > > > >> > currentTimeMillis) = > currentTimeMillis. > > >>> > > > > > > > >> >> > > > >> > If a broker is a follower, because it > > will > > >>> > keep > > >>> > > > the > > >>> > > > > > > > leader's > > >>> > > > > > > > >> >> > > timestamp > > >>> > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would > be > > >>> the > > >>> > > > > leader's > > >>> > > > > > > > clock > > >>> > > > > > > > >> >> when > > >>> > > > > > > > >> >> > it > > >>> > > > > > > > >> >> > > > >> appends > > >>> > > > > > > > >> >> > > > >> > that message message. It keeps track > of > > >>> the > > >>> > > > > > > > lastAppendedTime > > >>> > > > > > > > >> >> only > > >>> > > > > > > > >> >> > in > > >>> > > > > > > > >> >> > > > >> case > > >>> > > > > > > > >> >> > > > >> > it becomes leader later on. At that > > >>> point, it > > >>> > is > > >>> > > > > > > possible > > >>> > > > > > > > >> that > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > > > >> > timestamp of the last appended message > > was > > >>> > > stamped > > >>> > > > > by > > >>> > > > > > > old > > >>> > > > > > > > >> >> leader, > > >>> > > > > > > > >> >> > > but > > >>> > > > > > > > >> >> > > > >> the > > >>> > > > > > > > >> >> > > > >> > new leader's currentTimeMillis < > > >>> > > lastAppendedTime. > > >>> > > > > If > > >>> > > > > > a > > >>> > > > > > > > new > > >>> > > > > > > > >> >> > message > > >>> > > > > > > > >> >> > > > >> comes, > > >>> > > > > > > > >> >> > > > >> > instead of stamp it with new leader's > > >>> > > > > > currentTimeMillis, > > >>> > > > > > > > we > > >>> > > > > > > > >> >> have > > >>> > > > > > > > >> >> > to > > >>> > > > > > > > >> >> > > > >> stamp > > >>> > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the > > >>> timestamp > > >>> > in > > >>> > > > the > > >>> > > > > > log > > >>> > > > > > > > >> going > > >>> > > > > > > > >> >> > > > backward. > > >>> > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp, > > >>> > > currentTimeMillis) > > >>> > > > is > > >>> > > > > > > > purely > > >>> > > > > > > > >> >> based > > >>> > > > > > > > >> >> > on > > >>> > > > > > > > >> >> > > > the > > >>> > > > > > > > >> >> > > > >> > broker side clock. If MM produces > > message > > >>> with > > >>> > > > > > different > > >>> > > > > > > > >> >> > > LogAppendTime > > >>> > > > > > > > >> >> > > > >> in > > >>> > > > > > > > >> >> > > > >> > source clusters to the same target > > >>> cluster, > > >>> > the > > >>> > > > > > > > LogAppendTime > > >>> > > > > > > > >> >> will > > >>> > > > > > > > >> >> > > be > > >>> > > > > > > > >> >> > > > >> > ignored re-stamped by target cluster. > > >>> > > > > > > > >> >> > > > >> > I added a use case example for mirror > > >>> maker in > > >>> > > > > KIP-32. > > >>> > > > > > > > Also > > >>> > > > > > > > >> >> there > > >>> > > > > > > > >> >> > > is a > > >>> > > > > > > > >> >> > > > >> > corner case discussion about when we > > need > > >>> the > > >>> > > > > > > > >> >> > max(lastAppendedTime, > > >>> > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you > > take a > > >>> > look > > >>> > > > and > > >>> > > > > > see > > >>> > > > > > > if > > >>> > > > > > > > >> that > > >>> > > > > > > > >> >> > > > answers > > >>> > > > > > > > >> >> > > > >> > your question? > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > My main motivation is that given > that > > >>> both > > >>> > > Samza > > >>> > > > > and > > >>> > > > > > > > Kafka > > >>> > > > > > > > >> >> > streams > > >>> > > > > > > > >> >> > > > are > > >>> > > > > > > > >> >> > > > >> > > doing work that implies a mandatory > > >>> > > > client-defined > > >>> > > > > > > > notion > > >>> > > > > > > > >> of > > >>> > > > > > > > >> >> > > time, I > > >>> > > > > > > > >> >> > > > >> > really > > >>> > > > > > > > >> >> > > > >> > > think introducing a different > > mandatory > > >>> > notion > > >>> > > > of > > >>> > > > > > time > > >>> > > > > > > > in > > >>> > > > > > > > >> >> Kafka > > >>> > > > > > > > >> >> > is > > >>> > > > > > > > >> >> > > > >> going > > >>> > > > > > > > >> >> > > > >> > to > > >>> > > > > > > > >> >> > > > >> > > be quite odd. We should think hard > > >>> about how > > >>> > > > > > > > client-defined > > >>> > > > > > > > >> >> time > > >>> > > > > > > > >> >> > > > could > > >>> > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but > I'm > > >>> also > > >>> > not > > >>> > > > > sure > > >>> > > > > > > > that it > > >>> > > > > > > > >> >> > can't. > > >>> > > > > > > > >> >> > > > >> Having > > >>> > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about > > >>> this > > >>> > with > > >>> > > > > > > > Yi/Kartik on > > >>> > > > > > > > >> >> the > > >>> > > > > > > > >> >> > > > Samza > > >>> > > > > > > > >> >> > > > >> > side? > > >>> > > > > > > > >> >> > > > >> > I talked with Kartik and realized that > > it > > >>> > would > > >>> > > be > > >>> > > > > > > useful > > >>> > > > > > > > to > > >>> > > > > > > > >> >> have > > >>> > > > > > > > >> >> > a > > >>> > > > > > > > >> >> > > > >> client > > >>> > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like > > >>> stream > > >>> > > > > > > processing. > > >>> > > > > > > > >> >> > > > >> > I was trying to figure out if we can > > >>> simply > > >>> > use > > >>> > > > > client > > >>> > > > > > > > >> >> timestamp > > >>> > > > > > > > >> >> > > > without > > >>> > > > > > > > >> >> > > > >> > introducing the server time. There are > > >>> some > > >>> > > > > discussion > > >>> > > > > > > in > > >>> > > > > > > > the > > >>> > > > > > > > >> >> KIP. > > >>> > > > > > > > >> >> > > > >> > The key problem we want to solve here > is > > >>> > > > > > > > >> >> > > > >> > 1. We want log retention and rolling > to > > >>> depend > > >>> > > on > > >>> > > > > > server > > >>> > > > > > > > >> clock. > > >>> > > > > > > > >> >> > > > >> > 2. We want to make sure the > > log-assiciated > > >>> > > > timestamp > > >>> > > > > > to > > >>> > > > > > > be > > >>> > > > > > > > >> >> > retained > > >>> > > > > > > > >> >> > > > when > > >>> > > > > > > > >> >> > > > >> > replicas moves. > > >>> > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in > some > > >>> way > > >>> > that > > >>> > > > can > > >>> > > > > > > allow > > >>> > > > > > > > >> >> > searching > > >>> > > > > > > > >> >> > > > by > > >>> > > > > > > > >> >> > > > >> > timestamp. > > >>> > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass > > the > > >>> > > > > > > log-associated > > >>> > > > > > > > >> >> > timestamp > > >>> > > > > > > > >> >> > > > >> > through replication, that means we > need > > to > > >>> > have > > >>> > > a > > >>> > > > > > > > different > > >>> > > > > > > > >> >> > protocol > > >>> > > > > > > > >> >> > > > for > > >>> > > > > > > > >> >> > > > >> > replica fetching to pass > log-associated > > >>> > > timestamp. > > >>> > > > > It > > >>> > > > > > is > > >>> > > > > > > > >> >> actually > > >>> > > > > > > > >> >> > > > >> > complicated and there could be a lot > of > > >>> corner > > >>> > > > cases > > >>> > > > > > to > > >>> > > > > > > > >> handle. > > >>> > > > > > > > >> >> > e.g. > > >>> > > > > > > > >> >> > > > >> what > > >>> > > > > > > > >> >> > > > >> > if an old leader started to fetch from > > >>> the new > > >>> > > > > leader, > > >>> > > > > > > > should > > >>> > > > > > > > >> >> it > > >>> > > > > > > > >> >> > > also > > >>> > > > > > > > >> >> > > > >> > update all of its old log segment > > >>> timestamp? > > >>> > > > > > > > >> >> > > > >> > I think actually client side timestamp > > >>> would > > >>> > be > > >>> > > > > better > > >>> > > > > > > > for 3 > > >>> > > > > > > > >> >> if we > > >>> > > > > > > > >> >> > > can > > >>> > > > > > > > >> >> > > > >> > find a way to make it work. > > >>> > > > > > > > >> >> > > > >> > So far I am not able to convince > myself > > >>> that > > >>> > > only > > >>> > > > > > having > > >>> > > > > > > > >> client > > >>> > > > > > > > >> >> > side > > >>> > > > > > > > >> >> > > > >> > timestamp would work mainly because 1 > > and > > >>> 2. > > >>> > > There > > >>> > > > > > are a > > >>> > > > > > > > few > > >>> > > > > > > > >> >> > > > situations > > >>> > > > > > > > >> >> > > > >> I > > >>> > > > > > > > >> >> > > > >> > mentioned in the KIP. > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > When you are saying it won't work > you > > >>> are > > >>> > > > assuming > > >>> > > > > > > some > > >>> > > > > > > > >> >> > particular > > >>> > > > > > > > >> >> > > > >> > > implementation? Maybe that the index > > is > > >>> a > > >>> > > > > > > monotonically > > >>> > > > > > > > >> >> > increasing > > >>> > > > > > > > >> >> > > > >> set of > > >>> > > > > > > > >> >> > > > >> > > pointers to the least record with a > > >>> > timestamp > > >>> > > > > larger > > >>> > > > > > > > than > > >>> > > > > > > > >> the > > >>> > > > > > > > >> >> > > index > > >>> > > > > > > > >> >> > > > >> time? > > >>> > > > > > > > >> >> > > > >> > > In other words a search for time X > > >>> gives the > > >>> > > > > largest > > >>> > > > > > > > offset > > >>> > > > > > > > >> >> at > > >>> > > > > > > > >> >> > > which > > >>> > > > > > > > >> >> > > > >> all > > >>> > > > > > > > >> >> > > > >> > > records are <= X? > > >>> > > > > > > > >> >> > > > >> > It is a promising idea. We probably > can > > >>> have > > >>> > an > > >>> > > > > > > in-memory > > >>> > > > > > > > >> index > > >>> > > > > > > > >> >> > like > > >>> > > > > > > > >> >> > > > >> that, > > >>> > > > > > > > >> >> > > > >> > but might be complicated to have a > file > > on > > >>> > disk > > >>> > > > like > > >>> > > > > > > that. > > >>> > > > > > > > >> >> Imagine > > >>> > > > > > > > >> >> > > > there > > >>> > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see > > >>> message Y > > >>> > > > created > > >>> > > > > > at > > >>> > > > > > > T1 > > >>> > > > > > > > >> and > > >>> > > > > > > > >> >> > > created > > >>> > > > > > > > >> >> > > > >> > index like [T1->Y], then we see > message > > >>> > created > > >>> > > at > > >>> > > > > T1, > > >>> > > > > > > > >> >> supposedly > > >>> > > > > > > > >> >> > we > > >>> > > > > > > > >> >> > > > >> should > > >>> > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], > it > > is > > >>> > easy > > >>> > > to > > >>> > > > > do > > >>> > > > > > in > > >>> > > > > > > > >> >> memory, > > >>> > > > > > > > >> >> > but > > >>> > > > > > > > >> >> > > > we > > >>> > > > > > > > >> >> > > > >> > might have to rewrite the index file > > >>> > completely. > > >>> > > > > Maybe > > >>> > > > > > > we > > >>> > > > > > > > can > > >>> > > > > > > > >> >> have > > >>> > > > > > > > >> >> > > the > > >>> > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and > > only > > >>> > update > > >>> > > > the > > >>> > > > > > > first > > >>> > > > > > > > >> >> pointer > > >>> > > > > > > > >> >> > > for > > >>> > > > > > > > >> >> > > > >> any > > >>> > > > > > > > >> >> > > > >> > out of range timestamp, so the index > > will > > >>> be > > >>> > > > [0->X, > > >>> > > > > > > > T1->Y]. > > >>> > > > > > > > >> >> Also, > > >>> > > > > > > > >> >> > > the > > >>> > > > > > > > >> >> > > > >> range > > >>> > > > > > > > >> >> > > > >> > of timestamps in the log segments can > > >>> overlap > > >>> > > with > > >>> > > > > > each > > >>> > > > > > > > >> other. > > >>> > > > > > > > >> >> > That > > >>> > > > > > > > >> >> > > > >> means > > >>> > > > > > > > >> >> > > > >> > we either need to keep a cross > segments > > >>> index > > >>> > > file > > >>> > > > > or > > >>> > > > > > we > > >>> > > > > > > > need > > >>> > > > > > > > >> >> to > > >>> > > > > > > > >> >> > > check > > >>> > > > > > > > >> >> > > > >> all > > >>> > > > > > > > >> >> > > > >> > the index file for each log segment. > > >>> > > > > > > > >> >> > > > >> > I separated out the time based log > index > > >>> to > > >>> > > KIP-33 > > >>> > > > > > > > because it > > >>> > > > > > > > >> >> can > > >>> > > > > > > > >> >> > be > > >>> > > > > > > > >> >> > > > an > > >>> > > > > > > > >> >> > > > >> > independent follow up feature as Neha > > >>> > > suggested. I > > >>> > > > > > will > > >>> > > > > > > > try > > >>> > > > > > > > >> to > > >>> > > > > > > > >> >> > make > > >>> > > > > > > > >> >> > > > the > > >>> > > > > > > > >> >> > > > >> > time based index work with client side > > >>> > > timestamp. > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > For retention, I agree with the > > problem > > >>> you > > >>> > > > point > > >>> > > > > > out, > > >>> > > > > > > > but > > >>> > > > > > > > >> I > > >>> > > > > > > > >> >> > think > > >>> > > > > > > > >> >> > > > >> what > > >>> > > > > > > > >> >> > > > >> > you > > >>> > > > > > > > >> >> > > > >> > > are saying in that case is that you > > >>> want a > > >>> > > size > > >>> > > > > > limit > > >>> > > > > > > > too. > > >>> > > > > > > > >> If > > >>> > > > > > > > >> >> > you > > >>> > > > > > > > >> >> > > > use > > >>> > > > > > > > >> >> > > > >> > > system time you actually hit the > same > > >>> > problem: > > >>> > > > say > > >>> > > > > > you > > >>> > > > > > > > do a > > >>> > > > > > > > >> >> full > > >>> > > > > > > > >> >> > > > dump > > >>> > > > > > > > >> >> > > > >> of > > >>> > > > > > > > >> >> > > > >> > a > > >>> > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days > > >>> retention, > > >>> > > > your > > >>> > > > > > > > retention > > >>> > > > > > > > >> >> will > > >>> > > > > > > > >> >> > > > >> actually > > >>> > > > > > > > >> >> > > > >> > > not get enforced for the first 7 > days > > >>> > because > > >>> > > > the > > >>> > > > > > data > > >>> > > > > > > > is > > >>> > > > > > > > >> >> "new > > >>> > > > > > > > >> >> > to > > >>> > > > > > > > >> >> > > > >> Kafka". > > >>> > > > > > > > >> >> > > > >> > I kind of think the size limit here is > > >>> > > orthogonal. > > >>> > > > > It > > >>> > > > > > > is a > > >>> > > > > > > > >> >> valid > > >>> > > > > > > > >> >> > use > > >>> > > > > > > > >> >> > > > >> case > > >>> > > > > > > > >> >> > > > >> > where people only want to use time > based > > >>> > > retention > > >>> > > > > > only. > > >>> > > > > > > > In > > >>> > > > > > > > >> >> your > > >>> > > > > > > > >> >> > > > >> example, > > >>> > > > > > > > >> >> > > > >> > depending on client timestamp might > > break > > >>> the > > >>> > > > > > > > functionality - > > >>> > > > > > > > >> >> say > > >>> > > > > > > > >> >> > it > > >>> > > > > > > > >> >> > > > is > > >>> > > > > > > > >> >> > > > >> a > > >>> > > > > > > > >> >> > > > >> > bootstrap case people actually need to > > >>> read > > >>> > all > > >>> > > > the > > >>> > > > > > > data. > > >>> > > > > > > > If > > >>> > > > > > > > >> we > > >>> > > > > > > > >> >> > > depend > > >>> > > > > > > > >> >> > > > >> on > > >>> > > > > > > > >> >> > > > >> > the client timestamp, the data might > be > > >>> > deleted > > >>> > > > > > > instantly > > >>> > > > > > > > >> when > > >>> > > > > > > > >> >> > they > > >>> > > > > > > > >> >> > > > >> come to > > >>> > > > > > > > >> >> > > > >> > the broker. It might be too demanding > to > > >>> > expect > > >>> > > > the > > >>> > > > > > > > broker to > > >>> > > > > > > > >> >> > > > understand > > >>> > > > > > > > >> >> > > > >> > what people actually want to do with > the > > >>> data > > >>> > > > coming > > >>> > > > > > in. > > >>> > > > > > > > So > > >>> > > > > > > > >> the > > >>> > > > > > > > >> >> > > > >> guarantee > > >>> > > > > > > > >> >> > > > >> > of using server side timestamp is that > > >>> "after > > >>> > > > > appended > > >>> > > > > > > to > > >>> > > > > > > > the > > >>> > > > > > > > >> >> log, > > >>> > > > > > > > >> >> > > all > > >>> > > > > > > > >> >> > > > >> > messages will be available on broker > for > > >>> > > retention > > >>> > > > > > > time", > > >>> > > > > > > > >> >> which is > > >>> > > > > > > > >> >> > > not > > >>> > > > > > > > >> >> > > > >> > changeable by clients. > > >>> > > > > > > > >> >> > > > >> > > > > >>> > > > > > > > >> >> > > > >> > > -Jay > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, > > Jiangjie > > >>> > Qin < > > >>> > > > > > > > >> >> j...@linkedin.com > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > >> wrote: > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> >> Hi folks, > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> This proposal was previously in > KIP-31 > > >>> and we > > >>> > > > > > separated > > >>> > > > > > > > it > > >>> > > > > > > > >> to > > >>> > > > > > > > >> >> > > KIP-32 > > >>> > > > > > > > >> >> > > > >> per > > >>> > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion. > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> The proposal is to add the following > > two > > >>> > > > timestamps > > >>> > > > > > to > > >>> > > > > > > > Kafka > > >>> > > > > > > > >> >> > > message. > > >>> > > > > > > > >> >> > > > >> >> - CreateTime > > >>> > > > > > > > >> >> > > > >> >> - LogAppendTime > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the > > >>> producer > > >>> > and > > >>> > > > will > > >>> > > > > > > > change > > >>> > > > > > > > >> >> after > > >>> > > > > > > > >> >> > > > that. > > >>> > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by > broker > > >>> for > > >>> > > > purpose > > >>> > > > > > > such > > >>> > > > > > > > as > > >>> > > > > > > > >> >> > enforce > > >>> > > > > > > > >> >> > > > log > > >>> > > > > > > > >> >> > > > >> >> retention and log rolling. > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> Thanks, > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> >> > > >>> > > > > > > > >> >> > > > >> > > > >>> > > > > > > > >> >> > > > >> > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > > > >>> > > > > > > > >> >> > > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > -- > > >>> > > > > > > > >> >> > > -- Guozhang > > >>> > > > > > > > >> >> > > > > >>> > > > > > > > >> >> > > > >>> > > > > > > > >> >> > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > >> > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > >> > > >> > > > > > > > > > -- > -- Guozhang >