My hesitation for the changed protocol is that I think If we will have
topic configuration returned in the topic metadata, the current protocol
makes more sense. Because the timestamp type is a topic level setting so we
don't need to put it into each message. That is assuming the timestamp type
change on a topic rarely happens and if it is ever needed, the existing
data should be wiped out.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin <becket....@gmail.com> wrote:

> Bump up this thread per discussion on the KIP hangout.
>
> During the implementation of the KIP, Guozhang raised another proposal on
> how to indicate the message timestamp type used by messages. So we want to
> see people's opinion on this proposal.
>
> The difference between current and the new proposal only differs on
> messages that are a) compressed, and b) using LogAppendTime
>
> For compressed messages using LogAppendTime, the timestamps in the current
> proposal is as below:
> 1. When a producer produces the messages, it tries to set timestamp to -1
> for inner messages if it knows LogAppendTime is used.
> 2. When a broker receives the messages, it will overwrite the timestamp of
> inner message to -1 if needed and write server time to the wrapper message.
> Broker will do re-compression if inner message timestamp is overwritten.
> 3. When a consumer receives the messages, it will see the inner message
> timestamp is -1 so the wrapper message timestamp is used.
>
> Implementation wise, this proposal requires the producer to set timestamp
> for inner messages correctly to avoid broker side re-compression. To do
> that, the short term solution is to let producer infer the timestamp type
> returned by broker in ProduceResponse and set correct timestamp afterwards.
> This means the first few batches will still need re-compression on the
> broker. The long term solution is to have producer get topic configuration
> during metadata update.
>
>
> The proposed modification is:
> 1. When a producer produces the messages, it always using create time.
> 2. When a broker receives the messages, it ignores the inner messages
> timestamp, but simply set a wrapper message timestamp type attribute bit to
> 1 and set the timestamp of the wrapper message to server time. (The broker
> will also set the timesatmp type attribute bit accordingly for
> non-compressed messages using LogAppendTime).
> 3. When a consumer receives the messages, it checks timestamp type
> attribute bit of wrapper message. If it is set to 1, the inner message's
> timestamp will be ignored and the wrapper message's timestamp will be used.
>
> This approach uses an extra attribute bit. The good thing of the modified
> protocol is consumers will be able to know the timestamp type. And
> re-compression on broker side is completely avoided no matter what value is
> sent by the producer. In this approach the inner messages will have wrong
> timestamps.
>
> We want to see if people have concerns over the modified approach.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
> On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin <becket....@gmail.com> wrote:
>
>> Jun,
>>
>> 1. I agree it would be nice to have the timestamps used in a unified way.
>> My concern is that if we let server change timestamp of the inner message
>> for LogAppendTime, that will enforce the user who are using LogAppendTime
>> to always pay the recompression penalty. So using LogAppendTime makes
>> KIP-31 in vain.
>>
>> 4. If there are no entries in the log segment, we can read from the time
>> index before the previous log segment. If there is no previous entry
>> avaliable after we search until the earliest log segment, that means all
>> the previous log segment with a valid time index entry has been deleted. In
>> that case supposedly there should be only one log segment left - the active
>> log segment, we can simply set the latest timestamp to 0.
>>
>> Guozhang,
>>
>> Sorry for the confusion. by "the timestamp of the latest message" I
>> actually meant "the timestamp of the message with largest timestamp". So in
>> your example the "latest message" is 5.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>>> Jun, Jiangjie,
>>>
>>> I am confused about 3) here, if we use "the timestamp of the latest
>>> message"
>>> then doesn't this mean we will roll the log whenever a message delayed by
>>> rolling time is received as well? Just to clarify, my understanding of
>>> "the
>>> timestamp of the latest message", for example in the following log, is 1,
>>> not 5:
>>>
>>> 2, 3, 4, 5, 1
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote:
>>>
>>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
>>> whether
>>> > the messages are compressed or not. When
>>> > message.timestamp.type=LogAppendTime,
>>> > we will need to set timestamp in each message if messages are not
>>> > compressed, so that the follower can get the same timestamp. So, it
>>> seems
>>> > that we should do the same thing for inner messages when messages are
>>> > compressed.
>>> >
>>> > 4. I thought on startup, we restore the timestamp of the latest
>>> message by
>>> > reading from the time index of the last log segment. So, what happens
>>> if
>>> > there are no index entries?
>>> >
>>> > Thanks,
>>> >
>>> > Jun
>>> >
>>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com>
>>> wrote:
>>> >
>>> > > Thanks for the explanation, Jun.
>>> > >
>>> > > 1. That makes sense. So maybe we can do the following:
>>> > > (a) Set the timestamp in the compressed message to latest timestamp
>>> of
>>> > all
>>> > > its inner messages. This works for both LogAppendTime and CreateTime.
>>> > > (b) If message.timestamp.type=LogAppendTime, the broker will
>>> overwrite
>>> > all
>>> > > the inner message timestamp to -1 if they are not set to -1. This is
>>> > mainly
>>> > > for topics that are using LogAppendTime. Hopefully the producer will
>>> set
>>> > > the timestamp to -1 in the ProducerRecord to avoid server side
>>> > > recompression.
>>> > >
>>> > > 3. I see. That works. So the semantic of log rolling becomes "roll
>>> out
>>> > the
>>> > > log segment if it has been inactive since the latest message has
>>> > arrived."
>>> > >
>>> > > 4. Yes. If the largest timestamp is in previous log segment. The time
>>> > index
>>> > > for the current log segment does not have a valid offset in current
>>> log
>>> > > segment to point to. Maybe in that case we should build an empty log
>>> > index.
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jiangjie (Becket) Qin
>>> > >
>>> > >
>>> > >
>>> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> wrote:
>>> > >
>>> > > > 1. I was thinking more about saving the decompression overhead in
>>> the
>>> > > > follower. Currently, the follower doesn't decompress the messages.
>>> To
>>> > > keep
>>> > > > it that way, the outer message needs to include the timestamp of
>>> the
>>> > > latest
>>> > > > inner message to build the time index in the follower. The simplest
>>> > thing
>>> > > > to do is to change the timestamp in the inner messages if
>>> necessary, in
>>> > > > which case there will be the recompression overhead. However, in
>>> the
>>> > case
>>> > > > when the timestamp of the inner messages don't have to be changed
>>> > > > (hopefully more common), there won't be the recompression
>>> overhead. In
>>> > > > either case, we always set the timestamp in the outer message to
>>> be the
>>> > > > timestamp of the latest inner message, in the leader.
>>> > > >
>>> > > > 3. Basically, in each log segment, we keep track of the timestamp
>>> of
>>> > the
>>> > > > latest message. If current time - timestamp of latest message > log
>>> > > rolling
>>> > > > interval, we roll a new log segment. So, if messages with later
>>> > > timestamps
>>> > > > keep getting added, we only roll new log segments based on size.
>>> On the
>>> > > > other hand, if no new messages are added to a log, we can force a
>>> log
>>> > > roll
>>> > > > based on time, which addresses the issue in (b).
>>> > > >
>>> > > > 4. Hmm, the index is per segment and should only point to
>>> positions in
>>> > > the
>>> > > > corresponding .log file, not previous ones, right?
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jun
>>> > > >
>>> > > >
>>> > > >
>>> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <becket....@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > > > Hi Jun,
>>> > > > >
>>> > > > > Thanks a lot for the comments. Please see inline replies.
>>> > > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jiangjie (Becket) Qin
>>> > > > >
>>> > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io>
>>> wrote:
>>> > > > >
>>> > > > > > Hi, Becket,
>>> > > > > >
>>> > > > > > Thanks for the proposal. Looks good overall. A few comments
>>> below.
>>> > > > > >
>>> > > > > > 1. KIP-32 didn't say what timestamp should be set in a
>>> compressed
>>> > > > > message.
>>> > > > > > We probably should set it to the timestamp of the latest
>>> messages
>>> > > > > included
>>> > > > > > in the compressed one. This way, during indexing, we don't
>>> have to
>>> > > > > > decompress the message.
>>> > > > > >
>>> > > > > That is a good point.
>>> > > > > In normal cases, broker needs to decompress the message for
>>> > > verification
>>> > > > > purpose anyway. So building time index does not add additional
>>> > > > > decompression.
>>> > > > > During time index recovery, however, having a timestamp in
>>> compressed
>>> > > > > message might save the decompression.
>>> > > > >
>>> > > > > Another thing I am thinking is we should make sure KIP-32 works
>>> well
>>> > > with
>>> > > > > KIP-31. i.e. we don't want to do recompression in order to add
>>> > > timestamp
>>> > > > to
>>> > > > > messages.
>>> > > > > Take the approach in my last email, the timestamp in the messages
>>> > will
>>> > > > > either all be overwritten by server if
>>> > > > > message.timestamp.type=LogAppendTime, or they will not be
>>> overwritten
>>> > > if
>>> > > > > message.timestamp.type=CreateTime.
>>> > > > >
>>> > > > > Maybe we can use the timestamp in compressed messages in the
>>> > following
>>> > > > way:
>>> > > > > If message.timestamp.type=LogAppendTime, we have to overwrite
>>> > > timestamps
>>> > > > > for all the messages. We can simply write the timestamp in the
>>> > > compressed
>>> > > > > message to avoid recompression.
>>> > > > > If message.timestamp.type=CreateTime, we do not need to
>>> overwrite the
>>> > > > > timestamps. We either reject the entire compressed message or We
>>> just
>>> > > > leave
>>> > > > > the compressed message timestamp to be -1.
>>> > > > >
>>> > > > > So the semantic of the timestamp field in compressed message
>>> field
>>> > > > becomes:
>>> > > > > if it is greater than 0, that means LogAppendTime is used, the
>>> > > timestamp
>>> > > > of
>>> > > > > the inner messages is the compressed message LogAppendTime. If
>>> it is
>>> > > -1,
>>> > > > > that means the CreateTime is used, the timestamp is in each
>>> > individual
>>> > > > > inner message.
>>> > > > >
>>> > > > > This sacrifice the speed of recovery but seems worthy because we
>>> > avoid
>>> > > > > recompression.
>>> > > > >
>>> > > > >
>>> > > > > > 2. In KIP-33, should we make the time-based index interval
>>> > > > configurable?
>>> > > > > > Perhaps we can default it 60 secs, but allow users to
>>> configure it
>>> > to
>>> > > > > > smaller values if they want more precision.
>>> > > > > >
>>> > > > > Yes, we can do that.
>>> > > > >
>>> > > > >
>>> > > > > > 3. In KIP-33, I am not sure if log rolling should be based on
>>> the
>>> > > > > earliest
>>> > > > > > message. This would mean that we will need to roll a log
>>> segment
>>> > > every
>>> > > > > time
>>> > > > > > we get a message delayed by the log rolling time interval.
>>> Also, on
>>> > > > > broker
>>> > > > > > startup, we can get the timestamp of the latest message in a
>>> log
>>> > > > segment
>>> > > > > > pretty efficiently by just looking at the last time index
>>> entry.
>>> > But
>>> > > > > > getting the timestamp of the earliest timestamp requires a full
>>> > scan
>>> > > of
>>> > > > > all
>>> > > > > > log segments, which can be expensive. Previously, there were
>>> two
>>> > use
>>> > > > > cases
>>> > > > > > of time-based rolling: (a) more accurate time-based indexing
>>> and
>>> > (b)
>>> > > > > > retaining data by time (since the active segment is never
>>> deleted).
>>> > > (a)
>>> > > > > is
>>> > > > > > already solved with a time-based index. For (b), if the
>>> retention
>>> > is
>>> > > > > based
>>> > > > > > on the timestamp of the latest message in a log segment,
>>> perhaps
>>> > log
>>> > > > > > rolling should be based on that too.
>>> > > > > >
>>> > > > > I am not sure how to make log rolling work with the latest
>>> timestamp
>>> > in
>>> > > > > current log segment. Do you mean the log rolling can based on the
>>> > last
>>> > > > log
>>> > > > > segment's latest timestamp? If so how do we roll out the first
>>> > segment?
>>> > > > >
>>> > > > >
>>> > > > > > 4. In KIP-33, I presume the timestamp in the time index will be
>>> > > > > > monotonically increasing. So, if all messages in a log segment
>>> > have a
>>> > > > > > timestamp less than the largest timestamp in the previous log
>>> > > segment,
>>> > > > we
>>> > > > > > will use the latter to index this log segment?
>>> > > > > >
>>> > > > > Yes. The timestamps are monotonically increasing. If the largest
>>> > > > timestamp
>>> > > > > in the previous segment is very big, it is possible the time
>>> index of
>>> > > the
>>> > > > > current segment only have two index entries (inserted during
>>> segment
>>> > > > > creation and roll out), both are pointing to a message in the
>>> > previous
>>> > > > log
>>> > > > > segment. This is the corner case I mentioned before that we
>>> should
>>> > > expire
>>> > > > > the next log segment even before expiring the previous log
>>> segment
>>> > just
>>> > > > > because the largest timestamp is in previous log segment. In
>>> current
>>> > > > > approach, we will wait until the previous log segment expires,
>>> and
>>> > then
>>> > > > > delete both the previous log segment and the next log segment.
>>> > > > >
>>> > > > >
>>> > > > > > 5. In KIP-32, in the wire protocol, we mention both timestamp
>>> and
>>> > > time.
>>> > > > > > They should be consistent.
>>> > > > > >
>>> > > > > Will fix the wiki page.
>>> > > > >
>>> > > > >
>>> > > > > > Jun
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin <
>>> becket....@gmail.com
>>> > >
>>> > > > > wrote:
>>> > > > > >
>>> > > > > > > Hey Jay,
>>> > > > > > >
>>> > > > > > > Thanks for the comments.
>>> > > > > > >
>>> > > > > > > Good point about the actions after when
>>> > max.message.time.difference
>>> > > > is
>>> > > > > > > exceeded. Rejection is a useful behavior although I cannot
>>> think
>>> > of
>>> > > > use
>>> > > > > > > case at LinkedIn at this moment. I think it makes sense to
>>> add a
>>> > > > > > > configuration.
>>> > > > > > >
>>> > > > > > > How about the following configurations?
>>> > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime
>>> > > > > > > 2. max.message.time.difference.ms
>>> > > > > > >
>>> > > > > > > if message.timestamp.type is set to CreateTime, when the
>>> broker
>>> > > > > receives
>>> > > > > > a
>>> > > > > > > message, it will further check
>>> max.message.time.difference.ms,
>>> > and
>>> > > > > will
>>> > > > > > > reject the message it the time difference exceeds the
>>> threshold.
>>> > > > > > > If message.timestamp.type is set to LogAppendTime, the broker
>>> > will
>>> > > > > always
>>> > > > > > > stamp the message with current server time, regardless the
>>> value
>>> > of
>>> > > > > > > max.message.time.difference.ms
>>> > > > > > >
>>> > > > > > > This will make sure the message on the broker is either
>>> > CreateTime
>>> > > or
>>> > > > > > > LogAppendTime, but not mixture of both.
>>> > > > > > >
>>> > > > > > > What do you think?
>>> > > > > > >
>>> > > > > > > Thanks,
>>> > > > > > >
>>> > > > > > > Jiangjie (Becket) Qin
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <j...@confluent.io>
>>> > > wrote:
>>> > > > > > >
>>> > > > > > > > Hey Becket,
>>> > > > > > > >
>>> > > > > > > > That summary of pros and cons sounds about right to me.
>>> > > > > > > >
>>> > > > > > > > There are potentially two actions you could take when
>>> > > > > > > > max.message.time.difference is exceeded--override it or
>>> reject
>>> > > the
>>> > > > > > > > message entirely. Can we pick one of these or does the
>>> action
>>> > > need
>>> > > > to
>>> > > > > > > > be configurable too? (I'm not sure). The downside of more
>>> > > > > > > > configuration is that it is more fiddly and has more modes.
>>> > > > > > > >
>>> > > > > > > > I suppose the reason I was thinking of this as a
>>> "difference"
>>> > > > rather
>>> > > > > > > > than a hard type was that if you were going to go the
>>> reject
>>> > mode
>>> > > > you
>>> > > > > > > > would need some tolerance setting (i.e. if your SLA is
>>> that if
>>> > > your
>>> > > > > > > > timestamp is off by more than 10 minutes I give you an
>>> error).
>>> > I
>>> > > > > agree
>>> > > > > > > > with you that having one field that is potentially
>>> containing a
>>> > > mix
>>> > > > > of
>>> > > > > > > > two values is a bit weird.
>>> > > > > > > >
>>> > > > > > > > -Jay
>>> > > > > > > >
>>> > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin <
>>> > becket....@gmail.com
>>> > > >
>>> > > > > > wrote:
>>> > > > > > > > > It looks the format of the previous email was messed up.
>>> Send
>>> > > it
>>> > > > > > again.
>>> > > > > > > > >
>>> > > > > > > > > Just to recap, the last proposal Jay made (with some
>>> > > > implementation
>>> > > > > > > > > details added)
>>> > > > > > > > > was:
>>> > > > > > > > >
>>> > > > > > > > > 1. Allow user to stamp the message when produce
>>> > > > > > > > >
>>> > > > > > > > > 2. When broker receives a message it take a look at the
>>> > > > difference
>>> > > > > > > > between
>>> > > > > > > > > its local time and the timestamp in the message.
>>> > > > > > > > >   a. If the time difference is within a configurable
>>> > > > > > > > > max.message.time.difference.ms, the server will accept
>>> it
>>> > and
>>> > > > > append
>>> > > > > > > it
>>> > > > > > > > to
>>> > > > > > > > > the log.
>>> > > > > > > > >   b. If the time difference is beyond the configured
>>> > > > > > > > > max.message.time.difference.ms, the server will
>>> override the
>>> > > > > > timestamp
>>> > > > > > > > with
>>> > > > > > > > > its current local time and append the message to the log.
>>> > > > > > > > >   c. The default value of max.message.time.difference
>>> would
>>> > be
>>> > > > set
>>> > > > > to
>>> > > > > > > > > Long.MaxValue.
>>> > > > > > > > >
>>> > > > > > > > > 3. The configurable time difference threshold
>>> > > > > > > > > max.message.time.difference.ms will
>>> > > > > > > > > be a per topic configuration.
>>> > > > > > > > >
>>> > > > > > > > > 4. The indexed will be built so it has the following
>>> > guarantee.
>>> > > > > > > > >   a. If user search by time stamp:
>>> > > > > > > > >       - all the messages after that timestamp will be
>>> > consumed.
>>> > > > > > > > >       - user might see earlier messages.
>>> > > > > > > > >   b. The log retention will take a look at the last time
>>> > index
>>> > > > > entry
>>> > > > > > in
>>> > > > > > > > the
>>> > > > > > > > > time index file. Because the last entry will be the
>>> latest
>>> > > > > timestamp
>>> > > > > > in
>>> > > > > > > > the
>>> > > > > > > > > entire log segment. If that entry expires, the log
>>> segment
>>> > will
>>> > > > be
>>> > > > > > > > deleted.
>>> > > > > > > > >   c. The log rolling has to depend on the earliest
>>> timestamp.
>>> > > In
>>> > > > > this
>>> > > > > > > > case
>>> > > > > > > > > we may need to keep a in memory timestamp only for the
>>> > current
>>> > > > > active
>>> > > > > > > > log.
>>> > > > > > > > > On recover, we will need to read the active log segment
>>> to
>>> > get
>>> > > > this
>>> > > > > > > > timestamp
>>> > > > > > > > > of the earliest messages.
>>> > > > > > > > >
>>> > > > > > > > > 5. The downside of this proposal are:
>>> > > > > > > > >   a. The timestamp might not be monotonically increasing.
>>> > > > > > > > >   b. The log retention might become non-deterministic.
>>> i.e.
>>> > > When
>>> > > > a
>>> > > > > > > > message
>>> > > > > > > > > will be deleted now depends on the timestamp of the other
>>> > > > messages
>>> > > > > in
>>> > > > > > > the
>>> > > > > > > > > same log segment. And those timestamps are provided by
>>> > > > > > > > > user within a range depending on what the time difference
>>> > > > threshold
>>> > > > > > > > > configuration is.
>>> > > > > > > > >   c. The semantic meaning of the timestamp in the
>>> messages
>>> > > could
>>> > > > > be a
>>> > > > > > > > little
>>> > > > > > > > > bit vague because some of them come from the producer and
>>> > some
>>> > > of
>>> > > > > > them
>>> > > > > > > > are
>>> > > > > > > > > overwritten by brokers.
>>> > > > > > > > >
>>> > > > > > > > > 6. Although the proposal has some downsides, it gives
>>> user
>>> > the
>>> > > > > > > > flexibility
>>> > > > > > > > > to use the timestamp.
>>> > > > > > > > >   a. If the threshold is set to Long.MaxValue. The
>>> timestamp
>>> > in
>>> > > > the
>>> > > > > > > > message is
>>> > > > > > > > > equivalent to CreateTime.
>>> > > > > > > > >   b. If the threshold is set to 0. The timestamp in the
>>> > message
>>> > > > is
>>> > > > > > > > equivalent
>>> > > > > > > > > to LogAppendTime.
>>> > > > > > > > >
>>> > > > > > > > > This proposal actually allows user to use either
>>> CreateTime
>>> > or
>>> > > > > > > > LogAppendTime
>>> > > > > > > > > without introducing two timestamp concept at the same
>>> time. I
>>> > > > have
>>> > > > > > > > updated
>>> > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal.
>>> > > > > > > > >
>>> > > > > > > > > One thing I am thinking is that instead of having a time
>>> > > > difference
>>> > > > > > > > threshold,
>>> > > > > > > > > should we simply set have a TimestampType configuration?
>>> > > Because
>>> > > > in
>>> > > > > > > most
>>> > > > > > > > > cases, people will either set the threshold to 0 or
>>> > > > Long.MaxValue.
>>> > > > > > > > Setting
>>> > > > > > > > > anything in between will make the timestamp in the
>>> message
>>> > > > > > meaningless
>>> > > > > > > to
>>> > > > > > > > > user - user don't know if the timestamp has been
>>> overwritten
>>> > by
>>> > > > the
>>> > > > > > > > brokers.
>>> > > > > > > > >
>>> > > > > > > > > Any thoughts?
>>> > > > > > > > >
>>> > > > > > > > > Thanks,
>>> > > > > > > > > Jiangjie (Becket) Qin
>>> > > > > > > > >
>>> > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin
>>> > > > > > > <j...@linkedin.com.invalid
>>> > > > > > > > >
>>> > > > > > > > > wrote:
>>> > > > > > > > >
>>> > > > > > > > >> Bump up this thread.
>>> > > > > > > > >>
>>> > > > > > > > >> Just to recap, the last proposal Jay made (with some
>>> > > > > implementation
>>> > > > > > > > details
>>> > > > > > > > >> added) was:
>>> > > > > > > > >>
>>> > > > > > > > >>    1. Allow user to stamp the message when produce
>>> > > > > > > > >>    2. When broker receives a message it take a look at
>>> the
>>> > > > > > difference
>>> > > > > > > > >>    between its local time and the timestamp in the
>>> message.
>>> > > > > > > > >>       - If the time difference is within a configurable
>>> > > > > > > > >>       max.message.time.difference.ms, the server will
>>> > accept
>>> > > it
>>> > > > > and
>>> > > > > > > > append
>>> > > > > > > > >>       it to the log.
>>> > > > > > > > >>       - If the time difference is beyond the configured
>>> > > > > > > > >>       max.message.time.difference.ms, the server will
>>> > > override
>>> > > > > the
>>> > > > > > > > >>       timestamp with its current local time and append
>>> the
>>> > > > message
>>> > > > > > to
>>> > > > > > > > the
>>> > > > > > > > >> log.
>>> > > > > > > > >>       - The default value of max.message.time.difference
>>> > would
>>> > > > be
>>> > > > > > set
>>> > > > > > > to
>>> > > > > > > > >>       Long.MaxValue.
>>> > > > > > > > >>       3. The configurable time difference threshold
>>> > > > > > > > >>    max.message.time.difference.ms will be a per topic
>>> > > > > > configuration.
>>> > > > > > > > >>    4. The indexed will be built so it has the following
>>> > > > guarantee.
>>> > > > > > > > >>       - If user search by time stamp:
>>> > > > > > > > >>    - all the messages after that timestamp will be
>>> consumed.
>>> > > > > > > > >>       - user might see earlier messages.
>>> > > > > > > > >>       - The log retention will take a look at the last
>>> time
>>> > > > index
>>> > > > > > > entry
>>> > > > > > > > in
>>> > > > > > > > >>       the time index file. Because the last entry will
>>> be
>>> > the
>>> > > > > latest
>>> > > > > > > > >> timestamp in
>>> > > > > > > > >>       the entire log segment. If that entry expires,
>>> the log
>>> > > > > segment
>>> > > > > > > > will
>>> > > > > > > > >> be
>>> > > > > > > > >>       deleted.
>>> > > > > > > > >>       - The log rolling has to depend on the earliest
>>> > > timestamp.
>>> > > > > In
>>> > > > > > > this
>>> > > > > > > > >>       case we may need to keep a in memory timestamp
>>> only
>>> > for
>>> > > > the
>>> > > > > > > > >> current active
>>> > > > > > > > >>       log. On recover, we will need to read the active
>>> log
>>> > > > segment
>>> > > > > > to
>>> > > > > > > > get
>>> > > > > > > > >> this
>>> > > > > > > > >>       timestamp of the earliest messages.
>>> > > > > > > > >>    5. The downside of this proposal are:
>>> > > > > > > > >>       - The timestamp might not be monotonically
>>> increasing.
>>> > > > > > > > >>       - The log retention might become
>>> non-deterministic.
>>> > i.e.
>>> > > > > When
>>> > > > > > a
>>> > > > > > > > >>       message will be deleted now depends on the
>>> timestamp
>>> > of
>>> > > > the
>>> > > > > > > > >> other messages
>>> > > > > > > > >>       in the same log segment. And those timestamps are
>>> > > provided
>>> > > > > by
>>> > > > > > > > >> user within a
>>> > > > > > > > >>       range depending on what the time difference
>>> threshold
>>> > > > > > > > configuration
>>> > > > > > > > >> is.
>>> > > > > > > > >>       - The semantic meaning of the timestamp in the
>>> > messages
>>> > > > > could
>>> > > > > > > be a
>>> > > > > > > > >>       little bit vague because some of them come from
>>> the
>>> > > > producer
>>> > > > > > and
>>> > > > > > > > >> some of
>>> > > > > > > > >>       them are overwritten by brokers.
>>> > > > > > > > >>       6. Although the proposal has some downsides, it
>>> gives
>>> > > user
>>> > > > > the
>>> > > > > > > > >>    flexibility to use the timestamp.
>>> > > > > > > > >>    - If the threshold is set to Long.MaxValue. The
>>> timestamp
>>> > > in
>>> > > > > the
>>> > > > > > > > message
>>> > > > > > > > >>       is equivalent to CreateTime.
>>> > > > > > > > >>       - If the threshold is set to 0. The timestamp in
>>> the
>>> > > > message
>>> > > > > > is
>>> > > > > > > > >>       equivalent to LogAppendTime.
>>> > > > > > > > >>
>>> > > > > > > > >> This proposal actually allows user to use either
>>> CreateTime
>>> > or
>>> > > > > > > > >> LogAppendTime without introducing two timestamp concept
>>> at
>>> > the
>>> > > > > same
>>> > > > > > > > time. I
>>> > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with this
>>> > > proposal.
>>> > > > > > > > >>
>>> > > > > > > > >> One thing I am thinking is that instead of having a time
>>> > > > > difference
>>> > > > > > > > >> threshold, should we simply set have a TimestampType
>>> > > > > configuration?
>>> > > > > > > > Because
>>> > > > > > > > >> in most cases, people will either set the threshold to
>>> 0 or
>>> > > > > > > > Long.MaxValue.
>>> > > > > > > > >> Setting anything in between will make the timestamp in
>>> the
>>> > > > message
>>> > > > > > > > >> meaningless to user - user don't know if the timestamp
>>> has
>>> > > been
>>> > > > > > > > overwritten
>>> > > > > > > > >> by the brokers.
>>> > > > > > > > >>
>>> > > > > > > > >> Any thoughts?
>>> > > > > > > > >>
>>> > > > > > > > >> Thanks,
>>> > > > > > > > >> Jiangjie (Becket) Qin
>>> > > > > > > > >>
>>> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin <
>>> > > > j...@linkedin.com>
>>> > > > > > > > wrote:
>>> > > > > > > > >>
>>> > > > > > > > >> > Hi Jay,
>>> > > > > > > > >> >
>>> > > > > > > > >> > Thanks for such detailed explanation. I think we both
>>> are
>>> > > > trying
>>> > > > > > to
>>> > > > > > > > make
>>> > > > > > > > >> > CreateTime work for us if possible. To me by "work" it
>>> > means
>>> > > > > clear
>>> > > > > > > > >> > guarantees on:
>>> > > > > > > > >> > 1. Log Retention Time enforcement.
>>> > > > > > > > >> > 2. Log Rolling time enforcement (This might be less a
>>> > > concern
>>> > > > as
>>> > > > > > you
>>> > > > > > > > >> > pointed out)
>>> > > > > > > > >> > 3. Application search message by time.
>>> > > > > > > > >> >
>>> > > > > > > > >> > WRT (1), I agree the expectation for log retention
>>> might
>>> > be
>>> > > > > > > different
>>> > > > > > > > >> > depending on who we ask. But my concern is about the
>>> level
>>> > > of
>>> > > > > > > > guarantee
>>> > > > > > > > >> we
>>> > > > > > > > >> > give to user. My observation is that a clear
>>> guarantee to
>>> > > user
>>> > > > > is
>>> > > > > > > > >> critical
>>> > > > > > > > >> > regardless of the mechanism we choose. And this is the
>>> > > subtle
>>> > > > > but
>>> > > > > > > > >> important
>>> > > > > > > > >> > difference between using LogAppendTime and CreateTime.
>>> > > > > > > > >> >
>>> > > > > > > > >> > Let's say user asks this question: How long will my
>>> > message
>>> > > > stay
>>> > > > > > in
>>> > > > > > > > >> Kafka?
>>> > > > > > > > >> >
>>> > > > > > > > >> > If we use LogAppendTime for log retention, the answer
>>> is
>>> > > > message
>>> > > > > > > will
>>> > > > > > > > >> stay
>>> > > > > > > > >> > in Kafka for retention time after the message is
>>> produced
>>> > > (to
>>> > > > be
>>> > > > > > > more
>>> > > > > > > > >> > precise, upper bounded by log.rolling.ms +
>>> > log.retention.ms
>>> > > ).
>>> > > > > > User
>>> > > > > > > > has a
>>> > > > > > > > >> > clear guarantee and they may decide whether or not to
>>> put
>>> > > the
>>> > > > > > > message
>>> > > > > > > > >> into
>>> > > > > > > > >> > Kafka. Or how to adjust the retention time according
>>> to
>>> > > their
>>> > > > > > > > >> requirements.
>>> > > > > > > > >> > If we use create time for log retention, the answer
>>> would
>>> > be
>>> > > > it
>>> > > > > > > > depends.
>>> > > > > > > > >> > The best answer we can give is at least retention.ms
>>> > > because
>>> > > > > > there
>>> > > > > > > > is no
>>> > > > > > > > >> > guarantee when the messages will be deleted after
>>> that.
>>> > If a
>>> > > > > > message
>>> > > > > > > > sits
>>> > > > > > > > >> > somewhere behind a larger create time, the message
>>> might
>>> > > stay
>>> > > > > > longer
>>> > > > > > > > than
>>> > > > > > > > >> > expected. But we don't know how longer it would be
>>> because
>>> > > it
>>> > > > > > > depends
>>> > > > > > > > on
>>> > > > > > > > >> > the create time. In this case, it is hard for user to
>>> > decide
>>> > > > > what
>>> > > > > > to
>>> > > > > > > > do.
>>> > > > > > > > >> >
>>> > > > > > > > >> > I am worrying about this because a blurring guarantee
>>> has
>>> > > > bitten
>>> > > > > > us
>>> > > > > > > > >> > before, e.g. Topic creation. We have received many
>>> > questions
>>> > > > > like
>>> > > > > > > > "why my
>>> > > > > > > > >> > topic is not there after I created it". I can imagine
>>> we
>>> > > > receive
>>> > > > > > > > similar
>>> > > > > > > > >> > question asking "why my message is still there after
>>> > > retention
>>> > > > > > time
>>> > > > > > > > has
>>> > > > > > > > >> > reached". So my understanding is that a clear and
>>> solid
>>> > > > > guarantee
>>> > > > > > is
>>> > > > > > > > >> better
>>> > > > > > > > >> > than having a mechanism that works in most cases but
>>> > > > > occasionally
>>> > > > > > > does
>>> > > > > > > > >> not
>>> > > > > > > > >> > work.
>>> > > > > > > > >> >
>>> > > > > > > > >> > If we think of the retention guarantee we provide with
>>> > > > > > > LogAppendTime,
>>> > > > > > > > it
>>> > > > > > > > >> > is not broken as you said, because we are telling
>>> user the
>>> > > log
>>> > > > > > > > retention
>>> > > > > > > > >> is
>>> > > > > > > > >> > NOT based on create time at the first place.
>>> > > > > > > > >> >
>>> > > > > > > > >> > WRT (3), no matter whether we index on LogAppendTime
>>> or
>>> > > > > > CreateTime,
>>> > > > > > > > the
>>> > > > > > > > >> > best guarantee we can provide with user is "not
>>> missing
>>> > > > message
>>> > > > > > > after
>>> > > > > > > > a
>>> > > > > > > > >> > certain timestamp". Therefore I actually really like
>>> to
>>> > > index
>>> > > > on
>>> > > > > > > > >> CreateTime
>>> > > > > > > > >> > because that is the timestamp we provide to user, and
>>> we
>>> > can
>>> > > > > have
>>> > > > > > > the
>>> > > > > > > > >> solid
>>> > > > > > > > >> > guarantee.
>>> > > > > > > > >> > On the other hand, indexing on LogAppendTime and
>>> giving
>>> > user
>>> > > > > > > > CreateTime
>>> > > > > > > > >> > does not provide solid guarantee when user do search
>>> based
>>> > > on
>>> > > > > > > > timestamp.
>>> > > > > > > > >> It
>>> > > > > > > > >> > only works when LogAppendTime is always no earlier
>>> than
>>> > > > > > CreateTime.
>>> > > > > > > > This
>>> > > > > > > > >> is
>>> > > > > > > > >> > a reasonable assumption and we can easily enforce it.
>>> > > > > > > > >> >
>>> > > > > > > > >> > With above, I am not sure if we can avoid server
>>> timestamp
>>> > > to
>>> > > > > make
>>> > > > > > > log
>>> > > > > > > > >> > retention work with a clear guarantee. For searching
>>> by
>>> > > > > timestamp
>>> > > > > > > use
>>> > > > > > > > >> case,
>>> > > > > > > > >> > I really want to have the index built on CreateTime.
>>> But
>>> > > with
>>> > > > a
>>> > > > > > > > >> reasonable
>>> > > > > > > > >> > assumption and timestamp enforcement, a LogAppendTime
>>> > index
>>> > > > > would
>>> > > > > > > also
>>> > > > > > > > >> work.
>>> > > > > > > > >> >
>>> > > > > > > > >> > Thanks,
>>> > > > > > > > >> >
>>> > > > > > > > >> > Jiangjie (Becket) Qin
>>> > > > > > > > >> >
>>> > > > > > > > >> >
>>> > > > > > > > >> >
>>> > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps <
>>> > > j...@confluent.io
>>> > > > >
>>> > > > > > > wrote:
>>> > > > > > > > >> >
>>> > > > > > > > >> >> Hey Becket,
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> Let me see if I can address your concerns:
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> 1. Let's say we have two source clusters that are
>>> > mirrored
>>> > > to
>>> > > > > the
>>> > > > > > > > same
>>> > > > > > > > >> >> > target cluster. For some reason one of the mirror
>>> maker
>>> > > > from
>>> > > > > a
>>> > > > > > > > cluster
>>> > > > > > > > >> >> dies
>>> > > > > > > > >> >> > and after fix the issue we want to resume
>>> mirroring. In
>>> > > > this
>>> > > > > > case
>>> > > > > > > > it
>>> > > > > > > > >> is
>>> > > > > > > > >> >> > possible that when the mirror maker resumes
>>> mirroring,
>>> > > the
>>> > > > > > > > timestamp
>>> > > > > > > > >> of
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > messages have already gone beyond the acceptable
>>> > > timestamp
>>> > > > > > range
>>> > > > > > > on
>>> > > > > > > > >> >> broker.
>>> > > > > > > > >> >> > In order to let those messages go through, we have
>>> to
>>> > > bump
>>> > > > up
>>> > > > > > the
>>> > > > > > > > >> >> > *max.append.delay
>>> > > > > > > > >> >> > *for all the topics on the target broker. This
>>> could be
>>> > > > > > painful.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> Actually what I was suggesting was different. Here
>>> is my
>>> > > > > > > observation:
>>> > > > > > > > >> >> clusters/topics directly produced to by applications
>>> > have a
>>> > > > > valid
>>> > > > > > > > >> >> assertion
>>> > > > > > > > >> >> that log append time and create time are similar
>>> (let's
>>> > > call
>>> > > > > > these
>>> > > > > > > > >> >> "unbuffered"); other cluster/topic such as those that
>>> > > receive
>>> > > > > > data
>>> > > > > > > > from
>>> > > > > > > > >> a
>>> > > > > > > > >> >> database, a log file, or another kafka cluster don't
>>> have
>>> > > > that
>>> > > > > > > > >> assertion,
>>> > > > > > > > >> >> for these "buffered" clusters data can be arbitrarily
>>> > late.
>>> > > > > This
>>> > > > > > > > means
>>> > > > > > > > >> any
>>> > > > > > > > >> >> use of log append time on these buffered clusters is
>>> not
>>> > > very
>>> > > > > > > > >> meaningful,
>>> > > > > > > > >> >> and create time and log append time "should" be
>>> similar
>>> > on
>>> > > > > > > unbuffered
>>> > > > > > > > >> >> clusters so you can probably use either.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> Using log append time on buffered clusters actually
>>> > results
>>> > > > in
>>> > > > > > bad
>>> > > > > > > > >> things.
>>> > > > > > > > >> >> If you request the offset for a given time you get
>>> don't
>>> > > end
>>> > > > up
>>> > > > > > > > getting
>>> > > > > > > > >> >> data for that time but rather data that showed up at
>>> that
>>> > > > time.
>>> > > > > > If
>>> > > > > > > > you
>>> > > > > > > > >> try
>>> > > > > > > > >> >> to retain 7 days of data it may mostly work but any
>>> kind
>>> > of
>>> > > > > > > > >> bootstrapping
>>> > > > > > > > >> >> will result in retaining much more (potentially the
>>> whole
>>> > > > > > database
>>> > > > > > > > >> >> contents!).
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> So what I am suggesting in terms of the use of the
>>> > > > > > max.append.delay
>>> > > > > > > > is
>>> > > > > > > > >> >> that
>>> > > > > > > > >> >> unbuffered clusters would have this set and buffered
>>> > > clusters
>>> > > > > > would
>>> > > > > > > > not.
>>> > > > > > > > >> >> In
>>> > > > > > > > >> >> other words, in LI terminology, tracking and metrics
>>> > > clusters
>>> > > > > > would
>>> > > > > > > > have
>>> > > > > > > > >> >> this enforced, aggregate and replica clusters
>>> wouldn't.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> So you DO have the issue of potentially maintaining
>>> more
>>> > > data
>>> > > > > > than
>>> > > > > > > > you
>>> > > > > > > > >> >> need
>>> > > > > > > > >> >> to on aggregate clusters if your mirroring skews,
>>> but you
>>> > > > DON'T
>>> > > > > > > need
>>> > > > > > > > to
>>> > > > > > > > >> >> tweak the setting as you described.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> 2. Let's say in the above scenario we let the
>>> messages
>>> > in,
>>> > > at
>>> > > > > > that
>>> > > > > > > > point
>>> > > > > > > > >> >> > some log segments in the target cluster might have
>>> a
>>> > wide
>>> > > > > range
>>> > > > > > > of
>>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling
>>> > could
>>> > > > be
>>> > > > > > > tricky
>>> > > > > > > > >> >> because
>>> > > > > > > > >> >> > the first time index entry does not necessarily
>>> have
>>> > the
>>> > > > > > smallest
>>> > > > > > > > >> >> timestamp
>>> > > > > > > > >> >> > of all the messages in the log segment. Instead,
>>> it is
>>> > > the
>>> > > > > > > largest
>>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
>>> log to
>>> > > find
>>> > > > > the
>>> > > > > > > > >> message
>>> > > > > > > > >> >> > with smallest offset to see if we should roll.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> I think there are two uses for time-based log
>>> rolling:
>>> > > > > > > > >> >> 1. Making the offset lookup by timestamp work
>>> > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if it
>>> is
>>> > > > supposed
>>> > > > > > to
>>> > > > > > > > get
>>> > > > > > > > >> >> purged after 7 days
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> But think about these two use cases. (1) is totally
>>> > > obviated
>>> > > > by
>>> > > > > > the
>>> > > > > > > > >> >> time=>offset index we are adding which yields much
>>> more
>>> > > > > granular
>>> > > > > > > > offset
>>> > > > > > > > >> >> lookups. (2) Is actually totally broken if you
>>> switch to
>>> > > > append
>>> > > > > > > time,
>>> > > > > > > > >> >> right? If you want to be sure for security/privacy
>>> > reasons
>>> > > > you
>>> > > > > > only
>>> > > > > > > > >> retain
>>> > > > > > > > >> >> 7 days of data then if the log append and create time
>>> > > diverge
>>> > > > > you
>>> > > > > > > > >> actually
>>> > > > > > > > >> >> violate this requirement.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> I think 95% of people care about (1) which is solved
>>> in
>>> > the
>>> > > > > > > proposal
>>> > > > > > > > and
>>> > > > > > > > >> >> (2) is actually broken today as well as in both
>>> > proposals.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> 3. Theoretically it is possible that an older log
>>> segment
>>> > > > > > contains
>>> > > > > > > > >> >> > timestamps that are older than all the messages in
>>> a
>>> > > newer
>>> > > > > log
>>> > > > > > > > >> segment.
>>> > > > > > > > >> >> It
>>> > > > > > > > >> >> > would be weird that we are supposed to delete the
>>> newer
>>> > > log
>>> > > > > > > segment
>>> > > > > > > > >> >> before
>>> > > > > > > > >> >> > we delete the older log segment.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> The index timestamps would always be a lower bound
>>> (i.e.
>>> > > the
>>> > > > > > > maximum
>>> > > > > > > > at
>>> > > > > > > > >> >> that time) so I don't think that is possible.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >>  4. In bootstrap case, if we reload the data to a
>>> Kafka
>>> > > > > cluster,
>>> > > > > > we
>>> > > > > > > > have
>>> > > > > > > > >> >> to
>>> > > > > > > > >> >> > make sure we configure the topic correctly before
>>> we
>>> > load
>>> > > > the
>>> > > > > > > data.
>>> > > > > > > > >> >> > Otherwise the message might either be rejected
>>> because
>>> > > the
>>> > > > > > > > timestamp
>>> > > > > > > > >> is
>>> > > > > > > > >> >> too
>>> > > > > > > > >> >> > old, or it might be deleted immediately because the
>>> > > > retention
>>> > > > > > > time
>>> > > > > > > > has
>>> > > > > > > > >> >> > reached.
>>> > > > > > > > >> >>
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> See (1).
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> -Jay
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin
>>> > > > > > > > <j...@linkedin.com.invalid
>>> > > > > > > > >> >
>>> > > > > > > > >> >> wrote:
>>> > > > > > > > >> >>
>>> > > > > > > > >> >> > Hey Jay and Guozhang,
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > Thanks a lot for the reply. So if I understand
>>> > correctly,
>>> > > > > Jay's
>>> > > > > > > > >> proposal
>>> > > > > > > > >> >> > is:
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > 1. Let client stamp the message create time.
>>> > > > > > > > >> >> > 2. Broker build index based on client-stamped
>>> message
>>> > > > create
>>> > > > > > > time.
>>> > > > > > > > >> >> > 3. Broker only takes message whose create time is
>>> > withing
>>> > > > > > current
>>> > > > > > > > time
>>> > > > > > > > >> >> > plus/minus T (T is a configuration
>>> *max.append.delay*,
>>> > > > could
>>> > > > > be
>>> > > > > > > > topic
>>> > > > > > > > >> >> level
>>> > > > > > > > >> >> > configuration), if the timestamp is out of this
>>> range,
>>> > > > broker
>>> > > > > > > > rejects
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > message.
>>> > > > > > > > >> >> > 4. Because the create time of messages can be out
>>> of
>>> > > order,
>>> > > > > > when
>>> > > > > > > > >> broker
>>> > > > > > > > >> >> > builds the time based index it only provides the
>>> > > guarantee
>>> > > > > that
>>> > > > > > > if
>>> > > > > > > > a
>>> > > > > > > > >> >> > consumer starts consuming from the offset returned
>>> by
>>> > > > > searching
>>> > > > > > > by
>>> > > > > > > > >> >> > timestamp t, they will not miss any message created
>>> > after
>>> > > > t,
>>> > > > > > but
>>> > > > > > > > might
>>> > > > > > > > >> >> see
>>> > > > > > > > >> >> > some messages created before t.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > To build the time based index, every time when a
>>> broker
>>> > > > needs
>>> > > > > > to
>>> > > > > > > > >> insert
>>> > > > > > > > >> >> a
>>> > > > > > > > >> >> > new time index entry, the entry would be
>>> > > > > > > > {Largest_Timestamp_Ever_Seen
>>> > > > > > > > >> ->
>>> > > > > > > > >> >> > Current_Offset}. This basically means any timestamp
>>> > > larger
>>> > > > > than
>>> > > > > > > the
>>> > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after this
>>> offset
>>> > > > > because
>>> > > > > > > it
>>> > > > > > > > >> never
>>> > > > > > > > >> >> > saw them before. So we don't miss any message with
>>> > larger
>>> > > > > > > > timestamp.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > (@Guozhang, in this case, for log retention we only
>>> > need
>>> > > to
>>> > > > > > take
>>> > > > > > > a
>>> > > > > > > > >> look
>>> > > > > > > > >> >> at
>>> > > > > > > > >> >> > the last time index entry, because it must be the
>>> > largest
>>> > > > > > > timestamp
>>> > > > > > > > >> >> ever,
>>> > > > > > > > >> >> > if that timestamp is overdue, we can safely delete
>>> any
>>> > > log
>>> > > > > > > segment
>>> > > > > > > > >> >> before
>>> > > > > > > > >> >> > that. So we don't need to scan the log segment
>>> file for
>>> > > log
>>> > > > > > > > retention)
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > I assume that we are still going to have the new
>>> > > > FetchRequest
>>> > > > > > to
>>> > > > > > > > allow
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > time index replication for replicas.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > I think Jay's main point here is that we don't
>>> want to
>>> > > have
>>> > > > > two
>>> > > > > > > > >> >> timestamp
>>> > > > > > > > >> >> > concepts in Kafka, which I agree is a reasonable
>>> > concern.
>>> > > > > And I
>>> > > > > > > > also
>>> > > > > > > > >> >> agree
>>> > > > > > > > >> >> > that create time is more meaningful than
>>> LogAppendTime
>>> > > for
>>> > > > > > users.
>>> > > > > > > > But
>>> > > > > > > > >> I
>>> > > > > > > > >> >> am
>>> > > > > > > > >> >> > not sure if making everything base on Create Time
>>> would
>>> > > > work
>>> > > > > in
>>> > > > > > > all
>>> > > > > > > > >> >> cases.
>>> > > > > > > > >> >> > Here are my questions about this approach:
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > 1. Let's say we have two source clusters that are
>>> > > mirrored
>>> > > > to
>>> > > > > > the
>>> > > > > > > > same
>>> > > > > > > > >> >> > target cluster. For some reason one of the mirror
>>> maker
>>> > > > from
>>> > > > > a
>>> > > > > > > > cluster
>>> > > > > > > > >> >> dies
>>> > > > > > > > >> >> > and after fix the issue we want to resume
>>> mirroring. In
>>> > > > this
>>> > > > > > case
>>> > > > > > > > it
>>> > > > > > > > >> is
>>> > > > > > > > >> >> > possible that when the mirror maker resumes
>>> mirroring,
>>> > > the
>>> > > > > > > > timestamp
>>> > > > > > > > >> of
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > messages have already gone beyond the acceptable
>>> > > timestamp
>>> > > > > > range
>>> > > > > > > on
>>> > > > > > > > >> >> broker.
>>> > > > > > > > >> >> > In order to let those messages go through, we have
>>> to
>>> > > bump
>>> > > > up
>>> > > > > > the
>>> > > > > > > > >> >> > *max.append.delay
>>> > > > > > > > >> >> > *for all the topics on the target broker. This
>>> could be
>>> > > > > > painful.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > 2. Let's say in the above scenario we let the
>>> messages
>>> > > in,
>>> > > > at
>>> > > > > > > that
>>> > > > > > > > >> point
>>> > > > > > > > >> >> > some log segments in the target cluster might have
>>> a
>>> > wide
>>> > > > > range
>>> > > > > > > of
>>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling
>>> > could
>>> > > > be
>>> > > > > > > tricky
>>> > > > > > > > >> >> because
>>> > > > > > > > >> >> > the first time index entry does not necessarily
>>> have
>>> > the
>>> > > > > > smallest
>>> > > > > > > > >> >> timestamp
>>> > > > > > > > >> >> > of all the messages in the log segment. Instead,
>>> it is
>>> > > the
>>> > > > > > > largest
>>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
>>> log to
>>> > > find
>>> > > > > the
>>> > > > > > > > >> message
>>> > > > > > > > >> >> > with smallest offset to see if we should roll.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > 3. Theoretically it is possible that an older log
>>> > segment
>>> > > > > > > contains
>>> > > > > > > > >> >> > timestamps that are older than all the messages in
>>> a
>>> > > newer
>>> > > > > log
>>> > > > > > > > >> segment.
>>> > > > > > > > >> >> It
>>> > > > > > > > >> >> > would be weird that we are supposed to delete the
>>> newer
>>> > > log
>>> > > > > > > segment
>>> > > > > > > > >> >> before
>>> > > > > > > > >> >> > we delete the older log segment.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to a
>>> Kafka
>>> > > > > cluster,
>>> > > > > > > we
>>> > > > > > > > >> have
>>> > > > > > > > >> >> to
>>> > > > > > > > >> >> > make sure we configure the topic correctly before
>>> we
>>> > load
>>> > > > the
>>> > > > > > > data.
>>> > > > > > > > >> >> > Otherwise the message might either be rejected
>>> because
>>> > > the
>>> > > > > > > > timestamp
>>> > > > > > > > >> is
>>> > > > > > > > >> >> too
>>> > > > > > > > >> >> > old, or it might be deleted immediately because the
>>> > > > retention
>>> > > > > > > time
>>> > > > > > > > has
>>> > > > > > > > >> >> > reached.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > I am very concerned about the operational overhead
>>> and
>>> > > the
>>> > > > > > > > ambiguity
>>> > > > > > > > >> of
>>> > > > > > > > >> >> > guarantees we introduce if we purely rely on
>>> > CreateTime.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > It looks to me that the biggest issue of adopting
>>> > > > CreateTime
>>> > > > > > > > >> everywhere
>>> > > > > > > > >> >> is
>>> > > > > > > > >> >> > CreateTime can have big gaps. These gaps could be
>>> > caused
>>> > > by
>>> > > > > > > several
>>> > > > > > > > >> >> cases:
>>> > > > > > > > >> >> > [1]. Faulty clients
>>> > > > > > > > >> >> > [2]. Natural delays from different source
>>> > > > > > > > >> >> > [3]. Bootstrap
>>> > > > > > > > >> >> > [4]. Failure recovery
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps
>>> solve
>>> > [2]
>>> > > as
>>> > > > > > well
>>> > > > > > > > if we
>>> > > > > > > > >> >> are
>>> > > > > > > > >> >> > able to set a reasonable max.append.delay. But it
>>> does
>>> > > not
>>> > > > > seem
>>> > > > > > > > >> address
>>> > > > > > > > >> >> [3]
>>> > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are
>>> solvable
>>> > > > because
>>> > > > > > it
>>> > > > > > > > looks
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > CreateTime gap is unavoidable in those two cases.
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > Thanks,
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > Jiangjie (Becket) Qin
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang <
>>> > > > > > > wangg...@gmail.com
>>> > > > > > > > >
>>> > > > > > > > >> >> wrote:
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > > Just to complete Jay's option, here is my
>>> > > understanding:
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > 1. For log retention: if we want to remove data
>>> > before
>>> > > > time
>>> > > > > > t,
>>> > > > > > > we
>>> > > > > > > > >> look
>>> > > > > > > > >> >> > into
>>> > > > > > > > >> >> > > the index file of each segment and find the
>>> largest
>>> > > > > timestamp
>>> > > > > > > t'
>>> > > > > > > > <
>>> > > > > > > > >> t,
>>> > > > > > > > >> >> > find
>>> > > > > > > > >> >> > > the corresponding timestamp and start scanning
>>> to the
>>> > > end
>>> > > > > of
>>> > > > > > > the
>>> > > > > > > > >> >> segment,
>>> > > > > > > > >> >> > > if there is no entry with timestamp >= t, we can
>>> > delete
>>> > > > > this
>>> > > > > > > > >> segment;
>>> > > > > > > > >> >> if
>>> > > > > > > > >> >> > a
>>> > > > > > > > >> >> > > segment's index smallest timestamp is larger
>>> than t,
>>> > we
>>> > > > can
>>> > > > > > > skip
>>> > > > > > > > >> that
>>> > > > > > > > >> >> > > segment.
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > 2. For log rolling: if we want to start a new
>>> segment
>>> > > > after
>>> > > > > > > time
>>> > > > > > > > t,
>>> > > > > > > > >> we
>>> > > > > > > > >> >> > look
>>> > > > > > > > >> >> > > into the active segment's index file, if the
>>> largest
>>> > > > > > timestamp
>>> > > > > > > is
>>> > > > > > > > >> >> > already >
>>> > > > > > > > >> >> > > t, we can roll a new segment immediately; if it
>>> is <
>>> > t,
>>> > > > we
>>> > > > > > read
>>> > > > > > > > its
>>> > > > > > > > >> >> > > corresponding offset and start scanning to the
>>> end of
>>> > > the
>>> > > > > > > > segment,
>>> > > > > > > > >> if
>>> > > > > > > > >> >> we
>>> > > > > > > > >> >> > > find a record whose timestamp > t, we can roll a
>>> new
>>> > > > > segment.
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > For log rolling we only need to possibly scan a
>>> small
>>> > > > > portion
>>> > > > > > > the
>>> > > > > > > > >> >> active
>>> > > > > > > > >> >> > > segment, which should be fine; for log retention
>>> we
>>> > may
>>> > > > in
>>> > > > > > the
>>> > > > > > > > worst
>>> > > > > > > > >> >> case
>>> > > > > > > > >> >> > > end up scanning all segments, but in practice we
>>> may
>>> > > skip
>>> > > > > > most
>>> > > > > > > of
>>> > > > > > > > >> them
>>> > > > > > > > >> >> > > since their smallest timestamp in the index file
>>> is
>>> > > > larger
>>> > > > > > than
>>> > > > > > > > t.
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > Guozhang
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <
>>> > > > > > j...@confluent.io>
>>> > > > > > > > >> wrote:
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > > I think it should be possible to index
>>> out-of-order
>>> > > > > > > timestamps.
>>> > > > > > > > >> The
>>> > > > > > > > >> >> > > > timestamp index would be similar to the offset
>>> > > index, a
>>> > > > > > > memory
>>> > > > > > > > >> >> mapped
>>> > > > > > > > >> >> > > file
>>> > > > > > > > >> >> > > > appended to as part of the log append, but
>>> would
>>> > have
>>> > > > the
>>> > > > > > > > format
>>> > > > > > > > >> >> > > >   timestamp offset
>>> > > > > > > > >> >> > > > The timestamp entries would be monotonic and as
>>> > with
>>> > > > the
>>> > > > > > > offset
>>> > > > > > > > >> >> index
>>> > > > > > > > >> >> > > would
>>> > > > > > > > >> >> > > > be no more often then every 4k (or some
>>> > configurable
>>> > > > > > > threshold
>>> > > > > > > > to
>>> > > > > > > > >> >> keep
>>> > > > > > > > >> >> > > the
>>> > > > > > > > >> >> > > > index small--actually for timestamp it could
>>> > probably
>>> > > > be
>>> > > > > > much
>>> > > > > > > > more
>>> > > > > > > > >> >> > sparse
>>> > > > > > > > >> >> > > > than 4k).
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > > > A search for a timestamp t yields an offset o
>>> > before
>>> > > > > which
>>> > > > > > no
>>> > > > > > > > >> prior
>>> > > > > > > > >> >> > > message
>>> > > > > > > > >> >> > > > has a timestamp >= t. In other words if you
>>> read
>>> > the
>>> > > > log
>>> > > > > > > > starting
>>> > > > > > > > >> >> with
>>> > > > > > > > >> >> > o
>>> > > > > > > > >> >> > > > you are guaranteed not to miss any messages
>>> > occurring
>>> > > > at
>>> > > > > t
>>> > > > > > or
>>> > > > > > > > >> later
>>> > > > > > > > >> >> > > though
>>> > > > > > > > >> >> > > > you may get many before t (due to
>>> > out-of-orderness).
>>> > > > > Unlike
>>> > > > > > > the
>>> > > > > > > > >> >> offset
>>> > > > > > > > >> >> > > > index this bound doesn't really have to be
>>> tight
>>> > > (i.e.
>>> > > > > > > > probably no
>>> > > > > > > > >> >> need
>>> > > > > > > > >> >> > > to
>>> > > > > > > > >> >> > > > go search the log itself, though you could).
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > > > -Jay
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <
>>> > > > > > > j...@confluent.io>
>>> > > > > > > > >> >> wrote:
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > > > > Here's my basic take:
>>> > > > > > > > >> >> > > > > - I agree it would be nice to have a notion
>>> of
>>> > time
>>> > > > > baked
>>> > > > > > > in
>>> > > > > > > > if
>>> > > > > > > > >> it
>>> > > > > > > > >> >> > were
>>> > > > > > > > >> >> > > > > done right
>>> > > > > > > > >> >> > > > > - All the proposals so far seem pretty
>>> complex--I
>>> > > > think
>>> > > > > > > they
>>> > > > > > > > >> might
>>> > > > > > > > >> >> > make
>>> > > > > > > > >> >> > > > > things worse rather than better overall
>>> > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to the
>>> > message
>>> > > > is
>>> > > > > > > > probably
>>> > > > > > > > >> a
>>> > > > > > > > >> >> > > > > non-starter from a size perspective
>>> > > > > > > > >> >> > > > > - Even if it isn't in the message, having two
>>> > > notions
>>> > > > > of
>>> > > > > > > time
>>> > > > > > > > >> that
>>> > > > > > > > >> >> > > > control
>>> > > > > > > > >> >> > > > > different things is a bit confusing
>>> > > > > > > > >> >> > > > > - The mechanics of basing retention etc on
>>> log
>>> > > append
>>> > > > > > time
>>> > > > > > > > when
>>> > > > > > > > >> >> > that's
>>> > > > > > > > >> >> > > > not
>>> > > > > > > > >> >> > > > > in the log seem complicated
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > To that end here is a possible 4th option.
>>> Let me
>>> > > > know
>>> > > > > > what
>>> > > > > > > > you
>>> > > > > > > > >> >> > think.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > The basic idea is that the message creation
>>> time
>>> > is
>>> > > > > > closest
>>> > > > > > > > to
>>> > > > > > > > >> >> what
>>> > > > > > > > >> >> > the
>>> > > > > > > > >> >> > > > > user actually cares about but is dangerous
>>> if set
>>> > > > > wrong.
>>> > > > > > So
>>> > > > > > > > >> rather
>>> > > > > > > > >> >> > than
>>> > > > > > > > >> >> > > > > substitute another notion of time, let's try
>>> to
>>> > > > ensure
>>> > > > > > the
>>> > > > > > > > >> >> > correctness
>>> > > > > > > > >> >> > > of
>>> > > > > > > > >> >> > > > > message creation time by preventing
>>> arbitrarily
>>> > bad
>>> > > > > > message
>>> > > > > > > > >> >> creation
>>> > > > > > > > >> >> > > > times.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > First, let's see if we can agree that log
>>> append
>>> > > time
>>> > > > > is
>>> > > > > > > not
>>> > > > > > > > >> >> > something
>>> > > > > > > > >> >> > > > > anyone really cares about but rather an
>>> > > > implementation
>>> > > > > > > > detail.
>>> > > > > > > > >> The
>>> > > > > > > > >> >> > > > > timestamp that matters to the user is when
>>> the
>>> > > > message
>>> > > > > > > > occurred
>>> > > > > > > > >> >> (the
>>> > > > > > > > >> >> > > > > creation time). The log append time is
>>> basically
>>> > > just
>>> > > > > an
>>> > > > > > > > >> >> > approximation
>>> > > > > > > > >> >> > > to
>>> > > > > > > > >> >> > > > > this on the assumption that the message
>>> creation
>>> > > and
>>> > > > > the
>>> > > > > > > > message
>>> > > > > > > > >> >> > > receive
>>> > > > > > > > >> >> > > > on
>>> > > > > > > > >> >> > > > > the server occur pretty close together and
>>> the
>>> > > reason
>>> > > > > to
>>> > > > > > > > prefer
>>> > > > > > > > >> .
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > But as these values diverge the issue starts
>>> to
>>> > > > become
>>> > > > > > > > apparent.
>>> > > > > > > > >> >> Say
>>> > > > > > > > >> >> > > you
>>> > > > > > > > >> >> > > > > set the retention to one week and then mirror
>>> > data
>>> > > > > from a
>>> > > > > > > > topic
>>> > > > > > > > >> >> > > > containing
>>> > > > > > > > >> >> > > > > two years of retention. Your intention is
>>> clearly
>>> > > to
>>> > > > > keep
>>> > > > > > > the
>>> > > > > > > > >> last
>>> > > > > > > > >> >> > > week,
>>> > > > > > > > >> >> > > > > but because the mirroring is appending right
>>> now
>>> > > you
>>> > > > > will
>>> > > > > > > > keep
>>> > > > > > > > >> two
>>> > > > > > > > >> >> > > years.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > The reason we are liking log append time is
>>> > because
>>> > > > we
>>> > > > > > are
>>> > > > > > > > >> >> > > (justifiably)
>>> > > > > > > > >> >> > > > > concerned that in certain situations the
>>> creation
>>> > > > time
>>> > > > > > may
>>> > > > > > > > not
>>> > > > > > > > >> be
>>> > > > > > > > >> >> > > > > trustworthy. This same problem exists on the
>>> > > servers
>>> > > > > but
>>> > > > > > > > there
>>> > > > > > > > >> are
>>> > > > > > > > >> >> > > fewer
>>> > > > > > > > >> >> > > > > servers and they just run the kafka code so
>>> it is
>>> > > > less
>>> > > > > of
>>> > > > > > > an
>>> > > > > > > > >> >> issue.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > There are two possible ways to handle this:
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > >    1. Just tell people to add size based
>>> > > retention. I
>>> > > > > > think
>>> > > > > > > > this
>>> > > > > > > > >> >> is
>>> > > > > > > > >> >> > not
>>> > > > > > > > >> >> > > > >    entirely unreasonable, we're basically
>>> saying
>>> > we
>>> > > > > > retain
>>> > > > > > > > data
>>> > > > > > > > >> >> based
>>> > > > > > > > >> >> > > on
>>> > > > > > > > >> >> > > > the
>>> > > > > > > > >> >> > > > >    timestamp you give us in the data. If you
>>> give
>>> > > us
>>> > > > > bad
>>> > > > > > > > data we
>>> > > > > > > > >> >> will
>>> > > > > > > > >> >> > > > retain
>>> > > > > > > > >> >> > > > >    it for a bad amount of time. If you want
>>> to
>>> > > ensure
>>> > > > > we
>>> > > > > > > > don't
>>> > > > > > > > >> >> retain
>>> > > > > > > > >> >> > > > "too
>>> > > > > > > > >> >> > > > >    much" data, define "too much" by setting a
>>> > > > > time-based
>>> > > > > > > > >> retention
>>> > > > > > > > >> >> > > > setting.
>>> > > > > > > > >> >> > > > >    This is not entirely unreasonable but
>>> kind of
>>> > > > > suffers
>>> > > > > > > > from a
>>> > > > > > > > >> >> "one
>>> > > > > > > > >> >> > > bad
>>> > > > > > > > >> >> > > > >    apple" problem in a very large
>>> environment.
>>> > > > > > > > >> >> > > > >    2. Prevent bad timestamps. In general we
>>> can't
>>> > > > say a
>>> > > > > > > > >> timestamp
>>> > > > > > > > >> >> is
>>> > > > > > > > >> >> > > bad.
>>> > > > > > > > >> >> > > > >    However the definition we're implicitly
>>> using
>>> > is
>>> > > > > that
>>> > > > > > we
>>> > > > > > > > >> think
>>> > > > > > > > >> >> > there
>>> > > > > > > > >> >> > > > are a
>>> > > > > > > > >> >> > > > >    set of topics/clusters where the creation
>>> > > > timestamp
>>> > > > > > > should
>>> > > > > > > > >> >> always
>>> > > > > > > > >> >> > be
>>> > > > > > > > >> >> > > > "very
>>> > > > > > > > >> >> > > > >    close" to the log append timestamp. This
>>> is
>>> > true
>>> > > > for
>>> > > > > > > data
>>> > > > > > > > >> >> sources
>>> > > > > > > > >> >> > > > that have
>>> > > > > > > > >> >> > > > >    no buffering capability (which at
>>> LinkedIn is
>>> > > very
>>> > > > > > > common,
>>> > > > > > > > >> but
>>> > > > > > > > >> >> is
>>> > > > > > > > >> >> > > > more rare
>>> > > > > > > > >> >> > > > >    elsewhere). The solution in this case
>>> would be
>>> > > to
>>> > > > > > allow
>>> > > > > > > a
>>> > > > > > > > >> >> setting
>>> > > > > > > > >> >> > > > along the
>>> > > > > > > > >> >> > > > >    lines of max.append.delay which checks the
>>> > > > creation
>>> > > > > > > > timestamp
>>> > > > > > > > >> >> > > against
>>> > > > > > > > >> >> > > > the
>>> > > > > > > > >> >> > > > >    server time to look for too large a
>>> > divergence.
>>> > > > The
>>> > > > > > > > solution
>>> > > > > > > > >> >> would
>>> > > > > > > > >> >> > > > either
>>> > > > > > > > >> >> > > > >    be to reject the message or to override it
>>> > with
>>> > > > the
>>> > > > > > > server
>>> > > > > > > > >> >> time.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > So in LI's environment you would configure
>>> the
>>> > > > clusters
>>> > > > > > > used
>>> > > > > > > > for
>>> > > > > > > > >> >> > > direct,
>>> > > > > > > > >> >> > > > > unbuffered, message production (e.g.
>>> tracking and
>>> > > > > metrics
>>> > > > > > > > local)
>>> > > > > > > > >> >> to
>>> > > > > > > > >> >> > > > enforce
>>> > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound (say
>>> 10
>>> > > > mins),
>>> > > > > > and
>>> > > > > > > > all
>>> > > > > > > > >> >> other
>>> > > > > > > > >> >> > > > > clusters would just inherent these.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > The downside of this approach is requiring
>>> the
>>> > > > special
>>> > > > > > > > >> >> configuration.
>>> > > > > > > > >> >> > > > > However I think in 99% of environments this
>>> could
>>> > > be
>>> > > > > > > skipped
>>> > > > > > > > >> >> > entirely,
>>> > > > > > > > >> >> > > > it's
>>> > > > > > > > >> >> > > > > only when the ratio of clients to servers
>>> gets so
>>> > > > > massive
>>> > > > > > > > that
>>> > > > > > > > >> you
>>> > > > > > > > >> >> > need
>>> > > > > > > > >> >> > > > to
>>> > > > > > > > >> >> > > > > do this. The primary upside is that you have
>>> a
>>> > > single
>>> > > > > > > > >> >> authoritative
>>> > > > > > > > >> >> > > > notion
>>> > > > > > > > >> >> > > > > of time which is closest to what a user would
>>> > want
>>> > > > and
>>> > > > > is
>>> > > > > > > > stored
>>> > > > > > > > >> >> > > directly
>>> > > > > > > > >> >> > > > > in the message.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > I'm also assuming there is a workable
>>> approach
>>> > for
>>> > > > > > indexing
>>> > > > > > > > >> >> > > non-monotonic
>>> > > > > > > > >> >> > > > > timestamps, though I haven't actually worked
>>> that
>>> > > > out.
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > -Jay
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin
>>> > > > > > > > >> >> > <j...@linkedin.com.invalid
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > > > > wrote:
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > >> Bumping up this thread although most of the
>>> > > > discussion
>>> > > > > > > were
>>> > > > > > > > on
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > > > >> discussion thread of KIP-31 :)
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> I just updated the KIP page to add detailed
>>> > > solution
>>> > > > > for
>>> > > > > > > the
>>> > > > > > > > >> >> option
>>> > > > > > > > >> >> > > > >> (Option
>>> > > > > > > > >> >> > > > >> 3) that does not expose the LogAppendTime to
>>> > user.
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >>
>>> > > > > > > > >>
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> The option has a minor change to the fetch
>>> > request
>>> > > > to
>>> > > > > > > allow
>>> > > > > > > > >> >> fetching
>>> > > > > > > > >> >> > > > time
>>> > > > > > > > >> >> > > > >> index entry as well. I kind of like this
>>> > solution
>>> > > > > > because
>>> > > > > > > > its
>>> > > > > > > > >> >> just
>>> > > > > > > > >> >> > > doing
>>> > > > > > > > >> >> > > > >> what we need without introducing other
>>> things.
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> It will be great to see what are the
>>> feedback. I
>>> > > can
>>> > > > > > > explain
>>> > > > > > > > >> more
>>> > > > > > > > >> >> > > during
>>> > > > > > > > >> >> > > > >> tomorrow's KIP hangout.
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> Thanks,
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie
>>> Qin <
>>> > > > > > > > >> j...@linkedin.com
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >> > > > wrote:
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >> > Hi Jay,
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback on
>>> the
>>> > > > > timestamp
>>> > > > > > > > >> proposal
>>> > > > > > > > >> >> > that
>>> > > > > > > > >> >> > > > was
>>> > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31.
>>> Please see
>>> > > the
>>> > > > > > > replies
>>> > > > > > > > >> >> inline.
>>> > > > > > > > >> >> > > > >> > The main change I made compared with
>>> previous
>>> > > > > proposal
>>> > > > > > > is
>>> > > > > > > > to
>>> > > > > > > > >> >> add
>>> > > > > > > > >> >> > > both
>>> > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the
>>> message.
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay
>>> Kreps <
>>> > > > > > > > j...@confluent.io
>>> > > > > > > > >> >
>>> > > > > > > > >> >> > > wrote:
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> > > Hey Beckett,
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP
>>> just
>>> > for
>>> > > > > > > > simplicity of
>>> > > > > > > > >> >> > > > >> discussion.
>>> > > > > > > > >> >> > > > >> > You
>>> > > > > > > > >> >> > > > >> > > can still implement them in one patch. I
>>> > think
>>> > > > > > > > otherwise it
>>> > > > > > > > >> >> will
>>> > > > > > > > >> >> > > be
>>> > > > > > > > >> >> > > > >> hard
>>> > > > > > > > >> >> > > > >> > to
>>> > > > > > > > >> >> > > > >> > > discuss/vote on them since if you like
>>> the
>>> > > > offset
>>> > > > > > > > proposal
>>> > > > > > > > >> >> but
>>> > > > > > > > >> >> > not
>>> > > > > > > > >> >> > > > the
>>> > > > > > > > >> >> > > > >> > time
>>> > > > > > > > >> >> > > > >> > > proposal what do you do?
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > Introducing a second notion of time into
>>> > Kafka
>>> > > > is
>>> > > > > a
>>> > > > > > > > pretty
>>> > > > > > > > >> >> > massive
>>> > > > > > > > >> >> > > > >> > > philosophical change so it kind of
>>> warrants
>>> > > it's
>>> > > > > own
>>> > > > > > > > KIP I
>>> > > > > > > > >> >> think
>>> > > > > > > > >> >> > > it
>>> > > > > > > > >> >> > > > >> > isn't
>>> > > > > > > > >> >> > > > >> > > just "Change message format".
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > WRT time I think one thing to clarify
>>> in the
>>> > > > > > proposal
>>> > > > > > > is
>>> > > > > > > > >> how
>>> > > > > > > > >> >> MM
>>> > > > > > > > >> >> > > will
>>> > > > > > > > >> >> > > > >> have
>>> > > > > > > > >> >> > > > >> > > access to set the timestamp? Presumably
>>> this
>>> > > > will
>>> > > > > > be a
>>> > > > > > > > new
>>> > > > > > > > >> >> field
>>> > > > > > > > >> >> > > in
>>> > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then any
>>> user
>>> > can
>>> > > > set
>>> > > > > > the
>>> > > > > > > > >> >> > timestamp,
>>> > > > > > > > >> >> > > > >> right?
>>> > > > > > > > >> >> > > > >> > > I'm not sure you answered the questions
>>> > around
>>> > > > how
>>> > > > > > > this
>>> > > > > > > > >> will
>>> > > > > > > > >> >> > work
>>> > > > > > > > >> >> > > > for
>>> > > > > > > > >> >> > > > >> MM
>>> > > > > > > > >> >> > > > >> > > since when MM retains timestamps from
>>> > multiple
>>> > > > > > > > partitions
>>> > > > > > > > >> >> they
>>> > > > > > > > >> >> > > will
>>> > > > > > > > >> >> > > > >> then
>>> > > > > > > > >> >> > > > >> > be
>>> > > > > > > > >> >> > > > >> > > out of order and in the past (so the
>>> > > > > > > > >> >> max(lastAppendedTimestamp,
>>> > > > > > > > >> >> > > > >> > > currentTimeMillis) override you proposed
>>> > will
>>> > > > not
>>> > > > > > > work,
>>> > > > > > > > >> >> right?).
>>> > > > > > > > >> >> > > If
>>> > > > > > > > >> >> > > > we
>>> > > > > > > > >> >> > > > >> > > don't do this then when you set up
>>> mirroring
>>> > > the
>>> > > > > > data
>>> > > > > > > > will
>>> > > > > > > > >> >> all
>>> > > > > > > > >> >> > be
>>> > > > > > > > >> >> > > > new
>>> > > > > > > > >> >> > > > >> and
>>> > > > > > > > >> >> > > > >> > > you have the same retention problem you
>>> > > > described.
>>> > > > > > > > Maybe I
>>> > > > > > > > >> >> > missed
>>> > > > > > > > >> >> > > > >> > > something...?
>>> > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the timestamp
>>> of
>>> > the
>>> > > > > > message
>>> > > > > > > > that
>>> > > > > > > > >> >> last
>>> > > > > > > > >> >> > > > >> > appended to the log.
>>> > > > > > > > >> >> > > > >> > If a broker is a leader, since it will
>>> assign
>>> > > the
>>> > > > > > > > timestamp
>>> > > > > > > > >> by
>>> > > > > > > > >> >> > > itself,
>>> > > > > > > > >> >> > > > >> the
>>> > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local
>>> clock
>>> > > when
>>> > > > > > append
>>> > > > > > > > the
>>> > > > > > > > >> >> last
>>> > > > > > > > >> >> > > > >> message.
>>> > > > > > > > >> >> > > > >> > So if there is no leader migration,
>>> > > > > > > > >> max(lastAppendedTimestamp,
>>> > > > > > > > >> >> > > > >> > currentTimeMillis) = currentTimeMillis.
>>> > > > > > > > >> >> > > > >> > If a broker is a follower, because it will
>>> > keep
>>> > > > the
>>> > > > > > > > leader's
>>> > > > > > > > >> >> > > timestamp
>>> > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would be
>>> the
>>> > > > > leader's
>>> > > > > > > > clock
>>> > > > > > > > >> >> when
>>> > > > > > > > >> >> > it
>>> > > > > > > > >> >> > > > >> appends
>>> > > > > > > > >> >> > > > >> > that message message. It keeps track of
>>> the
>>> > > > > > > > lastAppendedTime
>>> > > > > > > > >> >> only
>>> > > > > > > > >> >> > in
>>> > > > > > > > >> >> > > > >> case
>>> > > > > > > > >> >> > > > >> > it becomes leader later on. At that
>>> point, it
>>> > is
>>> > > > > > > possible
>>> > > > > > > > >> that
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > > > >> > timestamp of the last appended message was
>>> > > stamped
>>> > > > > by
>>> > > > > > > old
>>> > > > > > > > >> >> leader,
>>> > > > > > > > >> >> > > but
>>> > > > > > > > >> >> > > > >> the
>>> > > > > > > > >> >> > > > >> > new leader's currentTimeMillis <
>>> > > lastAppendedTime.
>>> > > > > If
>>> > > > > > a
>>> > > > > > > > new
>>> > > > > > > > >> >> > message
>>> > > > > > > > >> >> > > > >> comes,
>>> > > > > > > > >> >> > > > >> > instead of stamp it with new leader's
>>> > > > > > currentTimeMillis,
>>> > > > > > > > we
>>> > > > > > > > >> >> have
>>> > > > > > > > >> >> > to
>>> > > > > > > > >> >> > > > >> stamp
>>> > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the
>>> timestamp
>>> > in
>>> > > > the
>>> > > > > > log
>>> > > > > > > > >> going
>>> > > > > > > > >> >> > > > backward.
>>> > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp,
>>> > > currentTimeMillis)
>>> > > > is
>>> > > > > > > > purely
>>> > > > > > > > >> >> based
>>> > > > > > > > >> >> > on
>>> > > > > > > > >> >> > > > the
>>> > > > > > > > >> >> > > > >> > broker side clock. If MM produces message
>>> with
>>> > > > > > different
>>> > > > > > > > >> >> > > LogAppendTime
>>> > > > > > > > >> >> > > > >> in
>>> > > > > > > > >> >> > > > >> > source clusters to the same target
>>> cluster,
>>> > the
>>> > > > > > > > LogAppendTime
>>> > > > > > > > >> >> will
>>> > > > > > > > >> >> > > be
>>> > > > > > > > >> >> > > > >> > ignored  re-stamped by target cluster.
>>> > > > > > > > >> >> > > > >> > I added a use case example for mirror
>>> maker in
>>> > > > > KIP-32.
>>> > > > > > > > Also
>>> > > > > > > > >> >> there
>>> > > > > > > > >> >> > > is a
>>> > > > > > > > >> >> > > > >> > corner case discussion about when we need
>>> the
>>> > > > > > > > >> >> > max(lastAppendedTime,
>>> > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you take a
>>> > look
>>> > > > and
>>> > > > > > see
>>> > > > > > > if
>>> > > > > > > > >> that
>>> > > > > > > > >> >> > > > answers
>>> > > > > > > > >> >> > > > >> > your question?
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > My main motivation is that given that
>>> both
>>> > > Samza
>>> > > > > and
>>> > > > > > > > Kafka
>>> > > > > > > > >> >> > streams
>>> > > > > > > > >> >> > > > are
>>> > > > > > > > >> >> > > > >> > > doing work that implies a mandatory
>>> > > > client-defined
>>> > > > > > > > notion
>>> > > > > > > > >> of
>>> > > > > > > > >> >> > > time, I
>>> > > > > > > > >> >> > > > >> > really
>>> > > > > > > > >> >> > > > >> > > think introducing a different mandatory
>>> > notion
>>> > > > of
>>> > > > > > time
>>> > > > > > > > in
>>> > > > > > > > >> >> Kafka
>>> > > > > > > > >> >> > is
>>> > > > > > > > >> >> > > > >> going
>>> > > > > > > > >> >> > > > >> > to
>>> > > > > > > > >> >> > > > >> > > be quite odd. We should think hard
>>> about how
>>> > > > > > > > client-defined
>>> > > > > > > > >> >> time
>>> > > > > > > > >> >> > > > could
>>> > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but I'm
>>> also
>>> > not
>>> > > > > sure
>>> > > > > > > > that it
>>> > > > > > > > >> >> > can't.
>>> > > > > > > > >> >> > > > >> Having
>>> > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about
>>> this
>>> > with
>>> > > > > > > > Yi/Kartik on
>>> > > > > > > > >> >> the
>>> > > > > > > > >> >> > > > Samza
>>> > > > > > > > >> >> > > > >> > side?
>>> > > > > > > > >> >> > > > >> > I talked with Kartik and realized that it
>>> > would
>>> > > be
>>> > > > > > > useful
>>> > > > > > > > to
>>> > > > > > > > >> >> have
>>> > > > > > > > >> >> > a
>>> > > > > > > > >> >> > > > >> client
>>> > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like
>>> stream
>>> > > > > > > processing.
>>> > > > > > > > >> >> > > > >> > I was trying to figure out if we can
>>> simply
>>> > use
>>> > > > > client
>>> > > > > > > > >> >> timestamp
>>> > > > > > > > >> >> > > > without
>>> > > > > > > > >> >> > > > >> > introducing the server time. There are
>>> some
>>> > > > > discussion
>>> > > > > > > in
>>> > > > > > > > the
>>> > > > > > > > >> >> KIP.
>>> > > > > > > > >> >> > > > >> > The key problem we want to solve here is
>>> > > > > > > > >> >> > > > >> > 1. We want log retention and rolling to
>>> depend
>>> > > on
>>> > > > > > server
>>> > > > > > > > >> clock.
>>> > > > > > > > >> >> > > > >> > 2. We want to make sure the log-assiciated
>>> > > > timestamp
>>> > > > > > to
>>> > > > > > > be
>>> > > > > > > > >> >> > retained
>>> > > > > > > > >> >> > > > when
>>> > > > > > > > >> >> > > > >> > replicas moves.
>>> > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in some
>>> way
>>> > that
>>> > > > can
>>> > > > > > > allow
>>> > > > > > > > >> >> > searching
>>> > > > > > > > >> >> > > > by
>>> > > > > > > > >> >> > > > >> > timestamp.
>>> > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass the
>>> > > > > > > log-associated
>>> > > > > > > > >> >> > timestamp
>>> > > > > > > > >> >> > > > >> > through replication, that means we need to
>>> > have
>>> > > a
>>> > > > > > > > different
>>> > > > > > > > >> >> > protocol
>>> > > > > > > > >> >> > > > for
>>> > > > > > > > >> >> > > > >> > replica fetching to pass log-associated
>>> > > timestamp.
>>> > > > > It
>>> > > > > > is
>>> > > > > > > > >> >> actually
>>> > > > > > > > >> >> > > > >> > complicated and there could be a lot of
>>> corner
>>> > > > cases
>>> > > > > > to
>>> > > > > > > > >> handle.
>>> > > > > > > > >> >> > e.g.
>>> > > > > > > > >> >> > > > >> what
>>> > > > > > > > >> >> > > > >> > if an old leader started to fetch from
>>> the new
>>> > > > > leader,
>>> > > > > > > > should
>>> > > > > > > > >> >> it
>>> > > > > > > > >> >> > > also
>>> > > > > > > > >> >> > > > >> > update all of its old log segment
>>> timestamp?
>>> > > > > > > > >> >> > > > >> > I think actually client side timestamp
>>> would
>>> > be
>>> > > > > better
>>> > > > > > > > for 3
>>> > > > > > > > >> >> if we
>>> > > > > > > > >> >> > > can
>>> > > > > > > > >> >> > > > >> > find a way to make it work.
>>> > > > > > > > >> >> > > > >> > So far I am not able to convince myself
>>> that
>>> > > only
>>> > > > > > having
>>> > > > > > > > >> client
>>> > > > > > > > >> >> > side
>>> > > > > > > > >> >> > > > >> > timestamp would work mainly because 1 and
>>> 2.
>>> > > There
>>> > > > > > are a
>>> > > > > > > > few
>>> > > > > > > > >> >> > > > situations
>>> > > > > > > > >> >> > > > >> I
>>> > > > > > > > >> >> > > > >> > mentioned in the KIP.
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > When you are saying it won't work you
>>> are
>>> > > > assuming
>>> > > > > > > some
>>> > > > > > > > >> >> > particular
>>> > > > > > > > >> >> > > > >> > > implementation? Maybe that the index is
>>> a
>>> > > > > > > monotonically
>>> > > > > > > > >> >> > increasing
>>> > > > > > > > >> >> > > > >> set of
>>> > > > > > > > >> >> > > > >> > > pointers to the least record with a
>>> > timestamp
>>> > > > > larger
>>> > > > > > > > than
>>> > > > > > > > >> the
>>> > > > > > > > >> >> > > index
>>> > > > > > > > >> >> > > > >> time?
>>> > > > > > > > >> >> > > > >> > > In other words a search for time X
>>> gives the
>>> > > > > largest
>>> > > > > > > > offset
>>> > > > > > > > >> >> at
>>> > > > > > > > >> >> > > which
>>> > > > > > > > >> >> > > > >> all
>>> > > > > > > > >> >> > > > >> > > records are <= X?
>>> > > > > > > > >> >> > > > >> > It is a promising idea. We probably can
>>> have
>>> > an
>>> > > > > > > in-memory
>>> > > > > > > > >> index
>>> > > > > > > > >> >> > like
>>> > > > > > > > >> >> > > > >> that,
>>> > > > > > > > >> >> > > > >> > but might be complicated to have a file on
>>> > disk
>>> > > > like
>>> > > > > > > that.
>>> > > > > > > > >> >> Imagine
>>> > > > > > > > >> >> > > > there
>>> > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see
>>> message Y
>>> > > > created
>>> > > > > > at
>>> > > > > > > T1
>>> > > > > > > > >> and
>>> > > > > > > > >> >> > > created
>>> > > > > > > > >> >> > > > >> > index like [T1->Y], then we see message
>>> > created
>>> > > at
>>> > > > > T1,
>>> > > > > > > > >> >> supposedly
>>> > > > > > > > >> >> > we
>>> > > > > > > > >> >> > > > >> should
>>> > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], it is
>>> > easy
>>> > > to
>>> > > > > do
>>> > > > > > in
>>> > > > > > > > >> >> memory,
>>> > > > > > > > >> >> > but
>>> > > > > > > > >> >> > > > we
>>> > > > > > > > >> >> > > > >> > might have to rewrite the index file
>>> > completely.
>>> > > > > Maybe
>>> > > > > > > we
>>> > > > > > > > can
>>> > > > > > > > >> >> have
>>> > > > > > > > >> >> > > the
>>> > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and only
>>> > update
>>> > > > the
>>> > > > > > > first
>>> > > > > > > > >> >> pointer
>>> > > > > > > > >> >> > > for
>>> > > > > > > > >> >> > > > >> any
>>> > > > > > > > >> >> > > > >> > out of range timestamp, so the index will
>>> be
>>> > > > [0->X,
>>> > > > > > > > T1->Y].
>>> > > > > > > > >> >> Also,
>>> > > > > > > > >> >> > > the
>>> > > > > > > > >> >> > > > >> range
>>> > > > > > > > >> >> > > > >> > of timestamps in the log segments can
>>> overlap
>>> > > with
>>> > > > > > each
>>> > > > > > > > >> other.
>>> > > > > > > > >> >> > That
>>> > > > > > > > >> >> > > > >> means
>>> > > > > > > > >> >> > > > >> > we either need to keep a cross segments
>>> index
>>> > > file
>>> > > > > or
>>> > > > > > we
>>> > > > > > > > need
>>> > > > > > > > >> >> to
>>> > > > > > > > >> >> > > check
>>> > > > > > > > >> >> > > > >> all
>>> > > > > > > > >> >> > > > >> > the index file for each log segment.
>>> > > > > > > > >> >> > > > >> > I separated out the time based log index
>>> to
>>> > > KIP-33
>>> > > > > > > > because it
>>> > > > > > > > >> >> can
>>> > > > > > > > >> >> > be
>>> > > > > > > > >> >> > > > an
>>> > > > > > > > >> >> > > > >> > independent follow up feature as Neha
>>> > > suggested. I
>>> > > > > > will
>>> > > > > > > > try
>>> > > > > > > > >> to
>>> > > > > > > > >> >> > make
>>> > > > > > > > >> >> > > > the
>>> > > > > > > > >> >> > > > >> > time based index work with client side
>>> > > timestamp.
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > For retention, I agree with the problem
>>> you
>>> > > > point
>>> > > > > > out,
>>> > > > > > > > but
>>> > > > > > > > >> I
>>> > > > > > > > >> >> > think
>>> > > > > > > > >> >> > > > >> what
>>> > > > > > > > >> >> > > > >> > you
>>> > > > > > > > >> >> > > > >> > > are saying in that case is that you
>>> want a
>>> > > size
>>> > > > > > limit
>>> > > > > > > > too.
>>> > > > > > > > >> If
>>> > > > > > > > >> >> > you
>>> > > > > > > > >> >> > > > use
>>> > > > > > > > >> >> > > > >> > > system time you actually hit the same
>>> > problem:
>>> > > > say
>>> > > > > > you
>>> > > > > > > > do a
>>> > > > > > > > >> >> full
>>> > > > > > > > >> >> > > > dump
>>> > > > > > > > >> >> > > > >> of
>>> > > > > > > > >> >> > > > >> > a
>>> > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days
>>> retention,
>>> > > > your
>>> > > > > > > > retention
>>> > > > > > > > >> >> will
>>> > > > > > > > >> >> > > > >> actually
>>> > > > > > > > >> >> > > > >> > > not get enforced for the first 7 days
>>> > because
>>> > > > the
>>> > > > > > data
>>> > > > > > > > is
>>> > > > > > > > >> >> "new
>>> > > > > > > > >> >> > to
>>> > > > > > > > >> >> > > > >> Kafka".
>>> > > > > > > > >> >> > > > >> > I kind of think the size limit here is
>>> > > orthogonal.
>>> > > > > It
>>> > > > > > > is a
>>> > > > > > > > >> >> valid
>>> > > > > > > > >> >> > use
>>> > > > > > > > >> >> > > > >> case
>>> > > > > > > > >> >> > > > >> > where people only want to use time based
>>> > > retention
>>> > > > > > only.
>>> > > > > > > > In
>>> > > > > > > > >> >> your
>>> > > > > > > > >> >> > > > >> example,
>>> > > > > > > > >> >> > > > >> > depending on client timestamp might break
>>> the
>>> > > > > > > > functionality -
>>> > > > > > > > >> >> say
>>> > > > > > > > >> >> > it
>>> > > > > > > > >> >> > > > is
>>> > > > > > > > >> >> > > > >> a
>>> > > > > > > > >> >> > > > >> > bootstrap case people actually need to
>>> read
>>> > all
>>> > > > the
>>> > > > > > > data.
>>> > > > > > > > If
>>> > > > > > > > >> we
>>> > > > > > > > >> >> > > depend
>>> > > > > > > > >> >> > > > >> on
>>> > > > > > > > >> >> > > > >> > the client timestamp, the data might be
>>> > deleted
>>> > > > > > > instantly
>>> > > > > > > > >> when
>>> > > > > > > > >> >> > they
>>> > > > > > > > >> >> > > > >> come to
>>> > > > > > > > >> >> > > > >> > the broker. It might be too demanding to
>>> > expect
>>> > > > the
>>> > > > > > > > broker to
>>> > > > > > > > >> >> > > > understand
>>> > > > > > > > >> >> > > > >> > what people actually want to do with the
>>> data
>>> > > > coming
>>> > > > > > in.
>>> > > > > > > > So
>>> > > > > > > > >> the
>>> > > > > > > > >> >> > > > >> guarantee
>>> > > > > > > > >> >> > > > >> > of using server side timestamp is that
>>> "after
>>> > > > > appended
>>> > > > > > > to
>>> > > > > > > > the
>>> > > > > > > > >> >> log,
>>> > > > > > > > >> >> > > all
>>> > > > > > > > >> >> > > > >> > messages will be available on broker for
>>> > > retention
>>> > > > > > > time",
>>> > > > > > > > >> >> which is
>>> > > > > > > > >> >> > > not
>>> > > > > > > > >> >> > > > >> > changeable by clients.
>>> > > > > > > > >> >> > > > >> > >
>>> > > > > > > > >> >> > > > >> > > -Jay
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie
>>> > Qin <
>>> > > > > > > > >> >> j...@linkedin.com
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > > >> wrote:
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >> >> Hi folks,
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >> This proposal was previously in KIP-31
>>> and we
>>> > > > > > separated
>>> > > > > > > > it
>>> > > > > > > > >> to
>>> > > > > > > > >> >> > > KIP-32
>>> > > > > > > > >> >> > > > >> per
>>> > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion.
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >> The proposal is to add the following two
>>> > > > timestamps
>>> > > > > > to
>>> > > > > > > > Kafka
>>> > > > > > > > >> >> > > message.
>>> > > > > > > > >> >> > > > >> >> - CreateTime
>>> > > > > > > > >> >> > > > >> >> - LogAppendTime
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the
>>> producer
>>> > and
>>> > > > will
>>> > > > > > > > change
>>> > > > > > > > >> >> after
>>> > > > > > > > >> >> > > > that.
>>> > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by broker
>>> for
>>> > > > purpose
>>> > > > > > > such
>>> > > > > > > > as
>>> > > > > > > > >> >> > enforce
>>> > > > > > > > >> >> > > > log
>>> > > > > > > > >> >> > > > >> >> retention and log rolling.
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >> Thanks,
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >>
>>> > > > > > > > >> >> > > > >> >
>>> > > > > > > > >> >> > > > >>
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > > >
>>> > > > > > > > >> >> > > >
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> > > --
>>> > > > > > > > >> >> > > -- Guozhang
>>> > > > > > > > >> >> > >
>>> > > > > > > > >> >> >
>>> > > > > > > > >> >>
>>> > > > > > > > >> >
>>> > > > > > > > >> >
>>> > > > > > > > >>
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>

Reply via email to