1. Hmm, it's more intuitive if the consumer sees the same timestamp whether
the messages are compressed or not. When message.timestamp.type=LogAppendTime,
we will need to set timestamp in each message if messages are not
compressed, so that the follower can get the same timestamp. So, it seems
that we should do the same thing for inner messages when messages are
compressed.

4. I thought on startup, we restore the timestamp of the latest message by
reading from the time index of the last log segment. So, what happens if
there are no index entries?

Thanks,

Jun

On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com> wrote:

> Thanks for the explanation, Jun.
>
> 1. That makes sense. So maybe we can do the following:
> (a) Set the timestamp in the compressed message to latest timestamp of all
> its inner messages. This works for both LogAppendTime and CreateTime.
> (b) If message.timestamp.type=LogAppendTime, the broker will overwrite all
> the inner message timestamp to -1 if they are not set to -1. This is mainly
> for topics that are using LogAppendTime. Hopefully the producer will set
> the timestamp to -1 in the ProducerRecord to avoid server side
> recompression.
>
> 3. I see. That works. So the semantic of log rolling becomes "roll out the
> log segment if it has been inactive since the latest message has arrived."
>
> 4. Yes. If the largest timestamp is in previous log segment. The time index
> for the current log segment does not have a valid offset in current log
> segment to point to. Maybe in that case we should build an empty log index.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> wrote:
>
> > 1. I was thinking more about saving the decompression overhead in the
> > follower. Currently, the follower doesn't decompress the messages. To
> keep
> > it that way, the outer message needs to include the timestamp of the
> latest
> > inner message to build the time index in the follower. The simplest thing
> > to do is to change the timestamp in the inner messages if necessary, in
> > which case there will be the recompression overhead. However, in the case
> > when the timestamp of the inner messages don't have to be changed
> > (hopefully more common), there won't be the recompression overhead. In
> > either case, we always set the timestamp in the outer message to be the
> > timestamp of the latest inner message, in the leader.
> >
> > 3. Basically, in each log segment, we keep track of the timestamp of the
> > latest message. If current time - timestamp of latest message > log
> rolling
> > interval, we roll a new log segment. So, if messages with later
> timestamps
> > keep getting added, we only roll new log segments based on size. On the
> > other hand, if no new messages are added to a log, we can force a log
> roll
> > based on time, which addresses the issue in (b).
> >
> > 4. Hmm, the index is per segment and should only point to positions in
> the
> > corresponding .log file, not previous ones, right?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks a lot for the comments. Please see inline replies.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Becket,
> > > >
> > > > Thanks for the proposal. Looks good overall. A few comments below.
> > > >
> > > > 1. KIP-32 didn't say what timestamp should be set in a compressed
> > > message.
> > > > We probably should set it to the timestamp of the latest messages
> > > included
> > > > in the compressed one. This way, during indexing, we don't have to
> > > > decompress the message.
> > > >
> > > That is a good point.
> > > In normal cases, broker needs to decompress the message for
> verification
> > > purpose anyway. So building time index does not add additional
> > > decompression.
> > > During time index recovery, however, having a timestamp in compressed
> > > message might save the decompression.
> > >
> > > Another thing I am thinking is we should make sure KIP-32 works well
> with
> > > KIP-31. i.e. we don't want to do recompression in order to add
> timestamp
> > to
> > > messages.
> > > Take the approach in my last email, the timestamp in the messages will
> > > either all be overwritten by server if
> > > message.timestamp.type=LogAppendTime, or they will not be overwritten
> if
> > > message.timestamp.type=CreateTime.
> > >
> > > Maybe we can use the timestamp in compressed messages in the following
> > way:
> > > If message.timestamp.type=LogAppendTime, we have to overwrite
> timestamps
> > > for all the messages. We can simply write the timestamp in the
> compressed
> > > message to avoid recompression.
> > > If message.timestamp.type=CreateTime, we do not need to overwrite the
> > > timestamps. We either reject the entire compressed message or We just
> > leave
> > > the compressed message timestamp to be -1.
> > >
> > > So the semantic of the timestamp field in compressed message field
> > becomes:
> > > if it is greater than 0, that means LogAppendTime is used, the
> timestamp
> > of
> > > the inner messages is the compressed message LogAppendTime. If it is
> -1,
> > > that means the CreateTime is used, the timestamp is in each individual
> > > inner message.
> > >
> > > This sacrifice the speed of recovery but seems worthy because we avoid
> > > recompression.
> > >
> > >
> > > > 2. In KIP-33, should we make the time-based index interval
> > configurable?
> > > > Perhaps we can default it 60 secs, but allow users to configure it to
> > > > smaller values if they want more precision.
> > > >
> > > Yes, we can do that.
> > >
> > >
> > > > 3. In KIP-33, I am not sure if log rolling should be based on the
> > > earliest
> > > > message. This would mean that we will need to roll a log segment
> every
> > > time
> > > > we get a message delayed by the log rolling time interval. Also, on
> > > broker
> > > > startup, we can get the timestamp of the latest message in a log
> > segment
> > > > pretty efficiently by just looking at the last time index entry. But
> > > > getting the timestamp of the earliest timestamp requires a full scan
> of
> > > all
> > > > log segments, which can be expensive. Previously, there were two use
> > > cases
> > > > of time-based rolling: (a) more accurate time-based indexing and (b)
> > > > retaining data by time (since the active segment is never deleted).
> (a)
> > > is
> > > > already solved with a time-based index. For (b), if the retention is
> > > based
> > > > on the timestamp of the latest message in a log segment, perhaps log
> > > > rolling should be based on that too.
> > > >
> > > I am not sure how to make log rolling work with the latest timestamp in
> > > current log segment. Do you mean the log rolling can based on the last
> > log
> > > segment's latest timestamp? If so how do we roll out the first segment?
> > >
> > >
> > > > 4. In KIP-33, I presume the timestamp in the time index will be
> > > > monotonically increasing. So, if all messages in a log segment have a
> > > > timestamp less than the largest timestamp in the previous log
> segment,
> > we
> > > > will use the latter to index this log segment?
> > > >
> > > Yes. The timestamps are monotonically increasing. If the largest
> > timestamp
> > > in the previous segment is very big, it is possible the time index of
> the
> > > current segment only have two index entries (inserted during segment
> > > creation and roll out), both are pointing to a message in the previous
> > log
> > > segment. This is the corner case I mentioned before that we should
> expire
> > > the next log segment even before expiring the previous log segment just
> > > because the largest timestamp is in previous log segment. In current
> > > approach, we will wait until the previous log segment expires, and then
> > > delete both the previous log segment and the next log segment.
> > >
> > >
> > > > 5. In KIP-32, in the wire protocol, we mention both timestamp and
> time.
> > > > They should be consistent.
> > > >
> > > Will fix the wiki page.
> > >
> > >
> > > > Jun
> > > >
> > > >
> > > >
> > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > Hey Jay,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > Good point about the actions after when max.message.time.difference
> > is
> > > > > exceeded. Rejection is a useful behavior although I cannot think of
> > use
> > > > > case at LinkedIn at this moment. I think it makes sense to add a
> > > > > configuration.
> > > > >
> > > > > How about the following configurations?
> > > > > 1. message.timestamp.type=CreateTime/LogAppendTime
> > > > > 2. max.message.time.difference.ms
> > > > >
> > > > > if message.timestamp.type is set to CreateTime, when the broker
> > > receives
> > > > a
> > > > > message, it will further check max.message.time.difference.ms, and
> > > will
> > > > > reject the message it the time difference exceeds the threshold.
> > > > > If message.timestamp.type is set to LogAppendTime, the broker will
> > > always
> > > > > stamp the message with current server time, regardless the value of
> > > > > max.message.time.difference.ms
> > > > >
> > > > > This will make sure the message on the broker is either CreateTime
> or
> > > > > LogAppendTime, but not mixture of both.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <j...@confluent.io>
> wrote:
> > > > >
> > > > > > Hey Becket,
> > > > > >
> > > > > > That summary of pros and cons sounds about right to me.
> > > > > >
> > > > > > There are potentially two actions you could take when
> > > > > > max.message.time.difference is exceeded--override it or reject
> the
> > > > > > message entirely. Can we pick one of these or does the action
> need
> > to
> > > > > > be configurable too? (I'm not sure). The downside of more
> > > > > > configuration is that it is more fiddly and has more modes.
> > > > > >
> > > > > > I suppose the reason I was thinking of this as a "difference"
> > rather
> > > > > > than a hard type was that if you were going to go the reject mode
> > you
> > > > > > would need some tolerance setting (i.e. if your SLA is that if
> your
> > > > > > timestamp is off by more than 10 minutes I give you an error). I
> > > agree
> > > > > > with you that having one field that is potentially containing a
> mix
> > > of
> > > > > > two values is a bit weird.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin <becket....@gmail.com
> >
> > > > wrote:
> > > > > > > It looks the format of the previous email was messed up. Send
> it
> > > > again.
> > > > > > >
> > > > > > > Just to recap, the last proposal Jay made (with some
> > implementation
> > > > > > > details added)
> > > > > > > was:
> > > > > > >
> > > > > > > 1. Allow user to stamp the message when produce
> > > > > > >
> > > > > > > 2. When broker receives a message it take a look at the
> > difference
> > > > > > between
> > > > > > > its local time and the timestamp in the message.
> > > > > > >   a. If the time difference is within a configurable
> > > > > > > max.message.time.difference.ms, the server will accept it and
> > > append
> > > > > it
> > > > > > to
> > > > > > > the log.
> > > > > > >   b. If the time difference is beyond the configured
> > > > > > > max.message.time.difference.ms, the server will override the
> > > > timestamp
> > > > > > with
> > > > > > > its current local time and append the message to the log.
> > > > > > >   c. The default value of max.message.time.difference would be
> > set
> > > to
> > > > > > > Long.MaxValue.
> > > > > > >
> > > > > > > 3. The configurable time difference threshold
> > > > > > > max.message.time.difference.ms will
> > > > > > > be a per topic configuration.
> > > > > > >
> > > > > > > 4. The indexed will be built so it has the following guarantee.
> > > > > > >   a. If user search by time stamp:
> > > > > > >       - all the messages after that timestamp will be consumed.
> > > > > > >       - user might see earlier messages.
> > > > > > >   b. The log retention will take a look at the last time index
> > > entry
> > > > in
> > > > > > the
> > > > > > > time index file. Because the last entry will be the latest
> > > timestamp
> > > > in
> > > > > > the
> > > > > > > entire log segment. If that entry expires, the log segment will
> > be
> > > > > > deleted.
> > > > > > >   c. The log rolling has to depend on the earliest timestamp.
> In
> > > this
> > > > > > case
> > > > > > > we may need to keep a in memory timestamp only for the current
> > > active
> > > > > > log.
> > > > > > > On recover, we will need to read the active log segment to get
> > this
> > > > > > timestamp
> > > > > > > of the earliest messages.
> > > > > > >
> > > > > > > 5. The downside of this proposal are:
> > > > > > >   a. The timestamp might not be monotonically increasing.
> > > > > > >   b. The log retention might become non-deterministic. i.e.
> When
> > a
> > > > > > message
> > > > > > > will be deleted now depends on the timestamp of the other
> > messages
> > > in
> > > > > the
> > > > > > > same log segment. And those timestamps are provided by
> > > > > > > user within a range depending on what the time difference
> > threshold
> > > > > > > configuration is.
> > > > > > >   c. The semantic meaning of the timestamp in the messages
> could
> > > be a
> > > > > > little
> > > > > > > bit vague because some of them come from the producer and some
> of
> > > > them
> > > > > > are
> > > > > > > overwritten by brokers.
> > > > > > >
> > > > > > > 6. Although the proposal has some downsides, it gives user the
> > > > > > flexibility
> > > > > > > to use the timestamp.
> > > > > > >   a. If the threshold is set to Long.MaxValue. The timestamp in
> > the
> > > > > > message is
> > > > > > > equivalent to CreateTime.
> > > > > > >   b. If the threshold is set to 0. The timestamp in the message
> > is
> > > > > > equivalent
> > > > > > > to LogAppendTime.
> > > > > > >
> > > > > > > This proposal actually allows user to use either CreateTime or
> > > > > > LogAppendTime
> > > > > > > without introducing two timestamp concept at the same time. I
> > have
> > > > > > updated
> > > > > > > the wiki for KIP-32 and KIP-33 with this proposal.
> > > > > > >
> > > > > > > One thing I am thinking is that instead of having a time
> > difference
> > > > > > threshold,
> > > > > > > should we simply set have a TimestampType configuration?
> Because
> > in
> > > > > most
> > > > > > > cases, people will either set the threshold to 0 or
> > Long.MaxValue.
> > > > > > Setting
> > > > > > > anything in between will make the timestamp in the message
> > > > meaningless
> > > > > to
> > > > > > > user - user don't know if the timestamp has been overwritten by
> > the
> > > > > > brokers.
> > > > > > >
> > > > > > > Any thoughts?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin
> > > > > <j...@linkedin.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Bump up this thread.
> > > > > > >>
> > > > > > >> Just to recap, the last proposal Jay made (with some
> > > implementation
> > > > > > details
> > > > > > >> added) was:
> > > > > > >>
> > > > > > >>    1. Allow user to stamp the message when produce
> > > > > > >>    2. When broker receives a message it take a look at the
> > > > difference
> > > > > > >>    between its local time and the timestamp in the message.
> > > > > > >>       - If the time difference is within a configurable
> > > > > > >>       max.message.time.difference.ms, the server will accept
> it
> > > and
> > > > > > append
> > > > > > >>       it to the log.
> > > > > > >>       - If the time difference is beyond the configured
> > > > > > >>       max.message.time.difference.ms, the server will
> override
> > > the
> > > > > > >>       timestamp with its current local time and append the
> > message
> > > > to
> > > > > > the
> > > > > > >> log.
> > > > > > >>       - The default value of max.message.time.difference would
> > be
> > > > set
> > > > > to
> > > > > > >>       Long.MaxValue.
> > > > > > >>       3. The configurable time difference threshold
> > > > > > >>    max.message.time.difference.ms will be a per topic
> > > > configuration.
> > > > > > >>    4. The indexed will be built so it has the following
> > guarantee.
> > > > > > >>       - If user search by time stamp:
> > > > > > >>    - all the messages after that timestamp will be consumed.
> > > > > > >>       - user might see earlier messages.
> > > > > > >>       - The log retention will take a look at the last time
> > index
> > > > > entry
> > > > > > in
> > > > > > >>       the time index file. Because the last entry will be the
> > > latest
> > > > > > >> timestamp in
> > > > > > >>       the entire log segment. If that entry expires, the log
> > > segment
> > > > > > will
> > > > > > >> be
> > > > > > >>       deleted.
> > > > > > >>       - The log rolling has to depend on the earliest
> timestamp.
> > > In
> > > > > this
> > > > > > >>       case we may need to keep a in memory timestamp only for
> > the
> > > > > > >> current active
> > > > > > >>       log. On recover, we will need to read the active log
> > segment
> > > > to
> > > > > > get
> > > > > > >> this
> > > > > > >>       timestamp of the earliest messages.
> > > > > > >>    5. The downside of this proposal are:
> > > > > > >>       - The timestamp might not be monotonically increasing.
> > > > > > >>       - The log retention might become non-deterministic. i.e.
> > > When
> > > > a
> > > > > > >>       message will be deleted now depends on the timestamp of
> > the
> > > > > > >> other messages
> > > > > > >>       in the same log segment. And those timestamps are
> provided
> > > by
> > > > > > >> user within a
> > > > > > >>       range depending on what the time difference threshold
> > > > > > configuration
> > > > > > >> is.
> > > > > > >>       - The semantic meaning of the timestamp in the messages
> > > could
> > > > > be a
> > > > > > >>       little bit vague because some of them come from the
> > producer
> > > > and
> > > > > > >> some of
> > > > > > >>       them are overwritten by brokers.
> > > > > > >>       6. Although the proposal has some downsides, it gives
> user
> > > the
> > > > > > >>    flexibility to use the timestamp.
> > > > > > >>    - If the threshold is set to Long.MaxValue. The timestamp
> in
> > > the
> > > > > > message
> > > > > > >>       is equivalent to CreateTime.
> > > > > > >>       - If the threshold is set to 0. The timestamp in the
> > message
> > > > is
> > > > > > >>       equivalent to LogAppendTime.
> > > > > > >>
> > > > > > >> This proposal actually allows user to use either CreateTime or
> > > > > > >> LogAppendTime without introducing two timestamp concept at the
> > > same
> > > > > > time. I
> > > > > > >> have updated the wiki for KIP-32 and KIP-33 with this
> proposal.
> > > > > > >>
> > > > > > >> One thing I am thinking is that instead of having a time
> > > difference
> > > > > > >> threshold, should we simply set have a TimestampType
> > > configuration?
> > > > > > Because
> > > > > > >> in most cases, people will either set the threshold to 0 or
> > > > > > Long.MaxValue.
> > > > > > >> Setting anything in between will make the timestamp in the
> > message
> > > > > > >> meaningless to user - user don't know if the timestamp has
> been
> > > > > > overwritten
> > > > > > >> by the brokers.
> > > > > > >>
> > > > > > >> Any thoughts?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Jiangjie (Becket) Qin
> > > > > > >>
> > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin <
> > j...@linkedin.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Jay,
> > > > > > >> >
> > > > > > >> > Thanks for such detailed explanation. I think we both are
> > trying
> > > > to
> > > > > > make
> > > > > > >> > CreateTime work for us if possible. To me by "work" it means
> > > clear
> > > > > > >> > guarantees on:
> > > > > > >> > 1. Log Retention Time enforcement.
> > > > > > >> > 2. Log Rolling time enforcement (This might be less a
> concern
> > as
> > > > you
> > > > > > >> > pointed out)
> > > > > > >> > 3. Application search message by time.
> > > > > > >> >
> > > > > > >> > WRT (1), I agree the expectation for log retention might be
> > > > > different
> > > > > > >> > depending on who we ask. But my concern is about the level
> of
> > > > > > guarantee
> > > > > > >> we
> > > > > > >> > give to user. My observation is that a clear guarantee to
> user
> > > is
> > > > > > >> critical
> > > > > > >> > regardless of the mechanism we choose. And this is the
> subtle
> > > but
> > > > > > >> important
> > > > > > >> > difference between using LogAppendTime and CreateTime.
> > > > > > >> >
> > > > > > >> > Let's say user asks this question: How long will my message
> > stay
> > > > in
> > > > > > >> Kafka?
> > > > > > >> >
> > > > > > >> > If we use LogAppendTime for log retention, the answer is
> > message
> > > > > will
> > > > > > >> stay
> > > > > > >> > in Kafka for retention time after the message is produced
> (to
> > be
> > > > > more
> > > > > > >> > precise, upper bounded by log.rolling.ms + log.retention.ms
> ).
> > > > User
> > > > > > has a
> > > > > > >> > clear guarantee and they may decide whether or not to put
> the
> > > > > message
> > > > > > >> into
> > > > > > >> > Kafka. Or how to adjust the retention time according to
> their
> > > > > > >> requirements.
> > > > > > >> > If we use create time for log retention, the answer would be
> > it
> > > > > > depends.
> > > > > > >> > The best answer we can give is at least retention.ms
> because
> > > > there
> > > > > > is no
> > > > > > >> > guarantee when the messages will be deleted after that. If a
> > > > message
> > > > > > sits
> > > > > > >> > somewhere behind a larger create time, the message might
> stay
> > > > longer
> > > > > > than
> > > > > > >> > expected. But we don't know how longer it would be because
> it
> > > > > depends
> > > > > > on
> > > > > > >> > the create time. In this case, it is hard for user to decide
> > > what
> > > > to
> > > > > > do.
> > > > > > >> >
> > > > > > >> > I am worrying about this because a blurring guarantee has
> > bitten
> > > > us
> > > > > > >> > before, e.g. Topic creation. We have received many questions
> > > like
> > > > > > "why my
> > > > > > >> > topic is not there after I created it". I can imagine we
> > receive
> > > > > > similar
> > > > > > >> > question asking "why my message is still there after
> retention
> > > > time
> > > > > > has
> > > > > > >> > reached". So my understanding is that a clear and solid
> > > guarantee
> > > > is
> > > > > > >> better
> > > > > > >> > than having a mechanism that works in most cases but
> > > occasionally
> > > > > does
> > > > > > >> not
> > > > > > >> > work.
> > > > > > >> >
> > > > > > >> > If we think of the retention guarantee we provide with
> > > > > LogAppendTime,
> > > > > > it
> > > > > > >> > is not broken as you said, because we are telling user the
> log
> > > > > > retention
> > > > > > >> is
> > > > > > >> > NOT based on create time at the first place.
> > > > > > >> >
> > > > > > >> > WRT (3), no matter whether we index on LogAppendTime or
> > > > CreateTime,
> > > > > > the
> > > > > > >> > best guarantee we can provide with user is "not missing
> > message
> > > > > after
> > > > > > a
> > > > > > >> > certain timestamp". Therefore I actually really like to
> index
> > on
> > > > > > >> CreateTime
> > > > > > >> > because that is the timestamp we provide to user, and we can
> > > have
> > > > > the
> > > > > > >> solid
> > > > > > >> > guarantee.
> > > > > > >> > On the other hand, indexing on LogAppendTime and giving user
> > > > > > CreateTime
> > > > > > >> > does not provide solid guarantee when user do search based
> on
> > > > > > timestamp.
> > > > > > >> It
> > > > > > >> > only works when LogAppendTime is always no earlier than
> > > > CreateTime.
> > > > > > This
> > > > > > >> is
> > > > > > >> > a reasonable assumption and we can easily enforce it.
> > > > > > >> >
> > > > > > >> > With above, I am not sure if we can avoid server timestamp
> to
> > > make
> > > > > log
> > > > > > >> > retention work with a clear guarantee. For searching by
> > > timestamp
> > > > > use
> > > > > > >> case,
> > > > > > >> > I really want to have the index built on CreateTime. But
> with
> > a
> > > > > > >> reasonable
> > > > > > >> > assumption and timestamp enforcement, a LogAppendTime index
> > > would
> > > > > also
> > > > > > >> work.
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> > Jiangjie (Becket) Qin
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > >> >
> > > > > > >> >> Hey Becket,
> > > > > > >> >>
> > > > > > >> >> Let me see if I can address your concerns:
> > > > > > >> >>
> > > > > > >> >> 1. Let's say we have two source clusters that are mirrored
> to
> > > the
> > > > > > same
> > > > > > >> >> > target cluster. For some reason one of the mirror maker
> > from
> > > a
> > > > > > cluster
> > > > > > >> >> dies
> > > > > > >> >> > and after fix the issue we want to resume mirroring. In
> > this
> > > > case
> > > > > > it
> > > > > > >> is
> > > > > > >> >> > possible that when the mirror maker resumes mirroring,
> the
> > > > > > timestamp
> > > > > > >> of
> > > > > > >> >> the
> > > > > > >> >> > messages have already gone beyond the acceptable
> timestamp
> > > > range
> > > > > on
> > > > > > >> >> broker.
> > > > > > >> >> > In order to let those messages go through, we have to
> bump
> > up
> > > > the
> > > > > > >> >> > *max.append.delay
> > > > > > >> >> > *for all the topics on the target broker. This could be
> > > > painful.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> Actually what I was suggesting was different. Here is my
> > > > > observation:
> > > > > > >> >> clusters/topics directly produced to by applications have a
> > > valid
> > > > > > >> >> assertion
> > > > > > >> >> that log append time and create time are similar (let's
> call
> > > > these
> > > > > > >> >> "unbuffered"); other cluster/topic such as those that
> receive
> > > > data
> > > > > > from
> > > > > > >> a
> > > > > > >> >> database, a log file, or another kafka cluster don't have
> > that
> > > > > > >> assertion,
> > > > > > >> >> for these "buffered" clusters data can be arbitrarily late.
> > > This
> > > > > > means
> > > > > > >> any
> > > > > > >> >> use of log append time on these buffered clusters is not
> very
> > > > > > >> meaningful,
> > > > > > >> >> and create time and log append time "should" be similar on
> > > > > unbuffered
> > > > > > >> >> clusters so you can probably use either.
> > > > > > >> >>
> > > > > > >> >> Using log append time on buffered clusters actually results
> > in
> > > > bad
> > > > > > >> things.
> > > > > > >> >> If you request the offset for a given time you get don't
> end
> > up
> > > > > > getting
> > > > > > >> >> data for that time but rather data that showed up at that
> > time.
> > > > If
> > > > > > you
> > > > > > >> try
> > > > > > >> >> to retain 7 days of data it may mostly work but any kind of
> > > > > > >> bootstrapping
> > > > > > >> >> will result in retaining much more (potentially the whole
> > > > database
> > > > > > >> >> contents!).
> > > > > > >> >>
> > > > > > >> >> So what I am suggesting in terms of the use of the
> > > > max.append.delay
> > > > > > is
> > > > > > >> >> that
> > > > > > >> >> unbuffered clusters would have this set and buffered
> clusters
> > > > would
> > > > > > not.
> > > > > > >> >> In
> > > > > > >> >> other words, in LI terminology, tracking and metrics
> clusters
> > > > would
> > > > > > have
> > > > > > >> >> this enforced, aggregate and replica clusters wouldn't.
> > > > > > >> >>
> > > > > > >> >> So you DO have the issue of potentially maintaining more
> data
> > > > than
> > > > > > you
> > > > > > >> >> need
> > > > > > >> >> to on aggregate clusters if your mirroring skews, but you
> > DON'T
> > > > > need
> > > > > > to
> > > > > > >> >> tweak the setting as you described.
> > > > > > >> >>
> > > > > > >> >> 2. Let's say in the above scenario we let the messages in,
> at
> > > > that
> > > > > > point
> > > > > > >> >> > some log segments in the target cluster might have a wide
> > > range
> > > > > of
> > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling could
> > be
> > > > > tricky
> > > > > > >> >> because
> > > > > > >> >> > the first time index entry does not necessarily have the
> > > > smallest
> > > > > > >> >> timestamp
> > > > > > >> >> > of all the messages in the log segment. Instead, it is
> the
> > > > > largest
> > > > > > >> >> > timestamp ever seen. We have to scan the entire log to
> find
> > > the
> > > > > > >> message
> > > > > > >> >> > with smallest offset to see if we should roll.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> I think there are two uses for time-based log rolling:
> > > > > > >> >> 1. Making the offset lookup by timestamp work
> > > > > > >> >> 2. Ensuring we don't retain data indefinitely if it is
> > supposed
> > > > to
> > > > > > get
> > > > > > >> >> purged after 7 days
> > > > > > >> >>
> > > > > > >> >> But think about these two use cases. (1) is totally
> obviated
> > by
> > > > the
> > > > > > >> >> time=>offset index we are adding which yields much more
> > > granular
> > > > > > offset
> > > > > > >> >> lookups. (2) Is actually totally broken if you switch to
> > append
> > > > > time,
> > > > > > >> >> right? If you want to be sure for security/privacy reasons
> > you
> > > > only
> > > > > > >> retain
> > > > > > >> >> 7 days of data then if the log append and create time
> diverge
> > > you
> > > > > > >> actually
> > > > > > >> >> violate this requirement.
> > > > > > >> >>
> > > > > > >> >> I think 95% of people care about (1) which is solved in the
> > > > > proposal
> > > > > > and
> > > > > > >> >> (2) is actually broken today as well as in both proposals.
> > > > > > >> >>
> > > > > > >> >> 3. Theoretically it is possible that an older log segment
> > > > contains
> > > > > > >> >> > timestamps that are older than all the messages in a
> newer
> > > log
> > > > > > >> segment.
> > > > > > >> >> It
> > > > > > >> >> > would be weird that we are supposed to delete the newer
> log
> > > > > segment
> > > > > > >> >> before
> > > > > > >> >> > we delete the older log segment.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> The index timestamps would always be a lower bound (i.e.
> the
> > > > > maximum
> > > > > > at
> > > > > > >> >> that time) so I don't think that is possible.
> > > > > > >> >>
> > > > > > >> >>  4. In bootstrap case, if we reload the data to a Kafka
> > > cluster,
> > > > we
> > > > > > have
> > > > > > >> >> to
> > > > > > >> >> > make sure we configure the topic correctly before we load
> > the
> > > > > data.
> > > > > > >> >> > Otherwise the message might either be rejected because
> the
> > > > > > timestamp
> > > > > > >> is
> > > > > > >> >> too
> > > > > > >> >> > old, or it might be deleted immediately because the
> > retention
> > > > > time
> > > > > > has
> > > > > > >> >> > reached.
> > > > > > >> >>
> > > > > > >> >>
> > > > > > >> >> See (1).
> > > > > > >> >>
> > > > > > >> >> -Jay
> > > > > > >> >>
> > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > >> >
> > > > > > >> >> wrote:
> > > > > > >> >>
> > > > > > >> >> > Hey Jay and Guozhang,
> > > > > > >> >> >
> > > > > > >> >> > Thanks a lot for the reply. So if I understand correctly,
> > > Jay's
> > > > > > >> proposal
> > > > > > >> >> > is:
> > > > > > >> >> >
> > > > > > >> >> > 1. Let client stamp the message create time.
> > > > > > >> >> > 2. Broker build index based on client-stamped message
> > create
> > > > > time.
> > > > > > >> >> > 3. Broker only takes message whose create time is withing
> > > > current
> > > > > > time
> > > > > > >> >> > plus/minus T (T is a configuration *max.append.delay*,
> > could
> > > be
> > > > > > topic
> > > > > > >> >> level
> > > > > > >> >> > configuration), if the timestamp is out of this range,
> > broker
> > > > > > rejects
> > > > > > >> >> the
> > > > > > >> >> > message.
> > > > > > >> >> > 4. Because the create time of messages can be out of
> order,
> > > > when
> > > > > > >> broker
> > > > > > >> >> > builds the time based index it only provides the
> guarantee
> > > that
> > > > > if
> > > > > > a
> > > > > > >> >> > consumer starts consuming from the offset returned by
> > > searching
> > > > > by
> > > > > > >> >> > timestamp t, they will not miss any message created after
> > t,
> > > > but
> > > > > > might
> > > > > > >> >> see
> > > > > > >> >> > some messages created before t.
> > > > > > >> >> >
> > > > > > >> >> > To build the time based index, every time when a broker
> > needs
> > > > to
> > > > > > >> insert
> > > > > > >> >> a
> > > > > > >> >> > new time index entry, the entry would be
> > > > > > {Largest_Timestamp_Ever_Seen
> > > > > > >> ->
> > > > > > >> >> > Current_Offset}. This basically means any timestamp
> larger
> > > than
> > > > > the
> > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after this offset
> > > because
> > > > > it
> > > > > > >> never
> > > > > > >> >> > saw them before. So we don't miss any message with larger
> > > > > > timestamp.
> > > > > > >> >> >
> > > > > > >> >> > (@Guozhang, in this case, for log retention we only need
> to
> > > > take
> > > > > a
> > > > > > >> look
> > > > > > >> >> at
> > > > > > >> >> > the last time index entry, because it must be the largest
> > > > > timestamp
> > > > > > >> >> ever,
> > > > > > >> >> > if that timestamp is overdue, we can safely delete any
> log
> > > > > segment
> > > > > > >> >> before
> > > > > > >> >> > that. So we don't need to scan the log segment file for
> log
> > > > > > retention)
> > > > > > >> >> >
> > > > > > >> >> > I assume that we are still going to have the new
> > FetchRequest
> > > > to
> > > > > > allow
> > > > > > >> >> the
> > > > > > >> >> > time index replication for replicas.
> > > > > > >> >> >
> > > > > > >> >> > I think Jay's main point here is that we don't want to
> have
> > > two
> > > > > > >> >> timestamp
> > > > > > >> >> > concepts in Kafka, which I agree is a reasonable concern.
> > > And I
> > > > > > also
> > > > > > >> >> agree
> > > > > > >> >> > that create time is more meaningful than LogAppendTime
> for
> > > > users.
> > > > > > But
> > > > > > >> I
> > > > > > >> >> am
> > > > > > >> >> > not sure if making everything base on Create Time would
> > work
> > > in
> > > > > all
> > > > > > >> >> cases.
> > > > > > >> >> > Here are my questions about this approach:
> > > > > > >> >> >
> > > > > > >> >> > 1. Let's say we have two source clusters that are
> mirrored
> > to
> > > > the
> > > > > > same
> > > > > > >> >> > target cluster. For some reason one of the mirror maker
> > from
> > > a
> > > > > > cluster
> > > > > > >> >> dies
> > > > > > >> >> > and after fix the issue we want to resume mirroring. In
> > this
> > > > case
> > > > > > it
> > > > > > >> is
> > > > > > >> >> > possible that when the mirror maker resumes mirroring,
> the
> > > > > > timestamp
> > > > > > >> of
> > > > > > >> >> the
> > > > > > >> >> > messages have already gone beyond the acceptable
> timestamp
> > > > range
> > > > > on
> > > > > > >> >> broker.
> > > > > > >> >> > In order to let those messages go through, we have to
> bump
> > up
> > > > the
> > > > > > >> >> > *max.append.delay
> > > > > > >> >> > *for all the topics on the target broker. This could be
> > > > painful.
> > > > > > >> >> >
> > > > > > >> >> > 2. Let's say in the above scenario we let the messages
> in,
> > at
> > > > > that
> > > > > > >> point
> > > > > > >> >> > some log segments in the target cluster might have a wide
> > > range
> > > > > of
> > > > > > >> >> > timestamps, like Guozhang mentioned the log rolling could
> > be
> > > > > tricky
> > > > > > >> >> because
> > > > > > >> >> > the first time index entry does not necessarily have the
> > > > smallest
> > > > > > >> >> timestamp
> > > > > > >> >> > of all the messages in the log segment. Instead, it is
> the
> > > > > largest
> > > > > > >> >> > timestamp ever seen. We have to scan the entire log to
> find
> > > the
> > > > > > >> message
> > > > > > >> >> > with smallest offset to see if we should roll.
> > > > > > >> >> >
> > > > > > >> >> > 3. Theoretically it is possible that an older log segment
> > > > > contains
> > > > > > >> >> > timestamps that are older than all the messages in a
> newer
> > > log
> > > > > > >> segment.
> > > > > > >> >> It
> > > > > > >> >> > would be weird that we are supposed to delete the newer
> log
> > > > > segment
> > > > > > >> >> before
> > > > > > >> >> > we delete the older log segment.
> > > > > > >> >> >
> > > > > > >> >> > 4. In bootstrap case, if we reload the data to a Kafka
> > > cluster,
> > > > > we
> > > > > > >> have
> > > > > > >> >> to
> > > > > > >> >> > make sure we configure the topic correctly before we load
> > the
> > > > > data.
> > > > > > >> >> > Otherwise the message might either be rejected because
> the
> > > > > > timestamp
> > > > > > >> is
> > > > > > >> >> too
> > > > > > >> >> > old, or it might be deleted immediately because the
> > retention
> > > > > time
> > > > > > has
> > > > > > >> >> > reached.
> > > > > > >> >> >
> > > > > > >> >> > I am very concerned about the operational overhead and
> the
> > > > > > ambiguity
> > > > > > >> of
> > > > > > >> >> > guarantees we introduce if we purely rely on CreateTime.
> > > > > > >> >> >
> > > > > > >> >> > It looks to me that the biggest issue of adopting
> > CreateTime
> > > > > > >> everywhere
> > > > > > >> >> is
> > > > > > >> >> > CreateTime can have big gaps. These gaps could be caused
> by
> > > > > several
> > > > > > >> >> cases:
> > > > > > >> >> > [1]. Faulty clients
> > > > > > >> >> > [2]. Natural delays from different source
> > > > > > >> >> > [3]. Bootstrap
> > > > > > >> >> > [4]. Failure recovery
> > > > > > >> >> >
> > > > > > >> >> > Jay's alternative proposal solves [1], perhaps solve [2]
> as
> > > > well
> > > > > > if we
> > > > > > >> >> are
> > > > > > >> >> > able to set a reasonable max.append.delay. But it does
> not
> > > seem
> > > > > > >> address
> > > > > > >> >> [3]
> > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are solvable
> > because
> > > > it
> > > > > > looks
> > > > > > >> >> the
> > > > > > >> >> > CreateTime gap is unavoidable in those two cases.
> > > > > > >> >> >
> > > > > > >> >> > Thanks,
> > > > > > >> >> >
> > > > > > >> >> > Jiangjie (Becket) Qin
> > > > > > >> >> >
> > > > > > >> >> >
> > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang <
> > > > > wangg...@gmail.com
> > > > > > >
> > > > > > >> >> wrote:
> > > > > > >> >> >
> > > > > > >> >> > > Just to complete Jay's option, here is my
> understanding:
> > > > > > >> >> > >
> > > > > > >> >> > > 1. For log retention: if we want to remove data before
> > time
> > > > t,
> > > > > we
> > > > > > >> look
> > > > > > >> >> > into
> > > > > > >> >> > > the index file of each segment and find the largest
> > > timestamp
> > > > > t'
> > > > > > <
> > > > > > >> t,
> > > > > > >> >> > find
> > > > > > >> >> > > the corresponding timestamp and start scanning to the
> end
> > > of
> > > > > the
> > > > > > >> >> segment,
> > > > > > >> >> > > if there is no entry with timestamp >= t, we can delete
> > > this
> > > > > > >> segment;
> > > > > > >> >> if
> > > > > > >> >> > a
> > > > > > >> >> > > segment's index smallest timestamp is larger than t, we
> > can
> > > > > skip
> > > > > > >> that
> > > > > > >> >> > > segment.
> > > > > > >> >> > >
> > > > > > >> >> > > 2. For log rolling: if we want to start a new segment
> > after
> > > > > time
> > > > > > t,
> > > > > > >> we
> > > > > > >> >> > look
> > > > > > >> >> > > into the active segment's index file, if the largest
> > > > timestamp
> > > > > is
> > > > > > >> >> > already >
> > > > > > >> >> > > t, we can roll a new segment immediately; if it is < t,
> > we
> > > > read
> > > > > > its
> > > > > > >> >> > > corresponding offset and start scanning to the end of
> the
> > > > > > segment,
> > > > > > >> if
> > > > > > >> >> we
> > > > > > >> >> > > find a record whose timestamp > t, we can roll a new
> > > segment.
> > > > > > >> >> > >
> > > > > > >> >> > > For log rolling we only need to possibly scan a small
> > > portion
> > > > > the
> > > > > > >> >> active
> > > > > > >> >> > > segment, which should be fine; for log retention we may
> > in
> > > > the
> > > > > > worst
> > > > > > >> >> case
> > > > > > >> >> > > end up scanning all segments, but in practice we may
> skip
> > > > most
> > > > > of
> > > > > > >> them
> > > > > > >> >> > > since their smallest timestamp in the index file is
> > larger
> > > > than
> > > > > > t.
> > > > > > >> >> > >
> > > > > > >> >> > > Guozhang
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <
> > > > j...@confluent.io>
> > > > > > >> wrote:
> > > > > > >> >> > >
> > > > > > >> >> > > > I think it should be possible to index out-of-order
> > > > > timestamps.
> > > > > > >> The
> > > > > > >> >> > > > timestamp index would be similar to the offset
> index, a
> > > > > memory
> > > > > > >> >> mapped
> > > > > > >> >> > > file
> > > > > > >> >> > > > appended to as part of the log append, but would have
> > the
> > > > > > format
> > > > > > >> >> > > >   timestamp offset
> > > > > > >> >> > > > The timestamp entries would be monotonic and as with
> > the
> > > > > offset
> > > > > > >> >> index
> > > > > > >> >> > > would
> > > > > > >> >> > > > be no more often then every 4k (or some configurable
> > > > > threshold
> > > > > > to
> > > > > > >> >> keep
> > > > > > >> >> > > the
> > > > > > >> >> > > > index small--actually for timestamp it could probably
> > be
> > > > much
> > > > > > more
> > > > > > >> >> > sparse
> > > > > > >> >> > > > than 4k).
> > > > > > >> >> > > >
> > > > > > >> >> > > > A search for a timestamp t yields an offset o before
> > > which
> > > > no
> > > > > > >> prior
> > > > > > >> >> > > message
> > > > > > >> >> > > > has a timestamp >= t. In other words if you read the
> > log
> > > > > > starting
> > > > > > >> >> with
> > > > > > >> >> > o
> > > > > > >> >> > > > you are guaranteed not to miss any messages occurring
> > at
> > > t
> > > > or
> > > > > > >> later
> > > > > > >> >> > > though
> > > > > > >> >> > > > you may get many before t (due to out-of-orderness).
> > > Unlike
> > > > > the
> > > > > > >> >> offset
> > > > > > >> >> > > > index this bound doesn't really have to be tight
> (i.e.
> > > > > > probably no
> > > > > > >> >> need
> > > > > > >> >> > > to
> > > > > > >> >> > > > go search the log itself, though you could).
> > > > > > >> >> > > >
> > > > > > >> >> > > > -Jay
> > > > > > >> >> > > >
> > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <
> > > > > j...@confluent.io>
> > > > > > >> >> wrote:
> > > > > > >> >> > > >
> > > > > > >> >> > > > > Here's my basic take:
> > > > > > >> >> > > > > - I agree it would be nice to have a notion of time
> > > baked
> > > > > in
> > > > > > if
> > > > > > >> it
> > > > > > >> >> > were
> > > > > > >> >> > > > > done right
> > > > > > >> >> > > > > - All the proposals so far seem pretty complex--I
> > think
> > > > > they
> > > > > > >> might
> > > > > > >> >> > make
> > > > > > >> >> > > > > things worse rather than better overall
> > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to the message
> > is
> > > > > > probably
> > > > > > >> a
> > > > > > >> >> > > > > non-starter from a size perspective
> > > > > > >> >> > > > > - Even if it isn't in the message, having two
> notions
> > > of
> > > > > time
> > > > > > >> that
> > > > > > >> >> > > > control
> > > > > > >> >> > > > > different things is a bit confusing
> > > > > > >> >> > > > > - The mechanics of basing retention etc on log
> append
> > > > time
> > > > > > when
> > > > > > >> >> > that's
> > > > > > >> >> > > > not
> > > > > > >> >> > > > > in the log seem complicated
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > To that end here is a possible 4th option. Let me
> > know
> > > > what
> > > > > > you
> > > > > > >> >> > think.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > The basic idea is that the message creation time is
> > > > closest
> > > > > > to
> > > > > > >> >> what
> > > > > > >> >> > the
> > > > > > >> >> > > > > user actually cares about but is dangerous if set
> > > wrong.
> > > > So
> > > > > > >> rather
> > > > > > >> >> > than
> > > > > > >> >> > > > > substitute another notion of time, let's try to
> > ensure
> > > > the
> > > > > > >> >> > correctness
> > > > > > >> >> > > of
> > > > > > >> >> > > > > message creation time by preventing arbitrarily bad
> > > > message
> > > > > > >> >> creation
> > > > > > >> >> > > > times.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > First, let's see if we can agree that log append
> time
> > > is
> > > > > not
> > > > > > >> >> > something
> > > > > > >> >> > > > > anyone really cares about but rather an
> > implementation
> > > > > > detail.
> > > > > > >> The
> > > > > > >> >> > > > > timestamp that matters to the user is when the
> > message
> > > > > > occurred
> > > > > > >> >> (the
> > > > > > >> >> > > > > creation time). The log append time is basically
> just
> > > an
> > > > > > >> >> > approximation
> > > > > > >> >> > > to
> > > > > > >> >> > > > > this on the assumption that the message creation
> and
> > > the
> > > > > > message
> > > > > > >> >> > > receive
> > > > > > >> >> > > > on
> > > > > > >> >> > > > > the server occur pretty close together and the
> reason
> > > to
> > > > > > prefer
> > > > > > >> .
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > But as these values diverge the issue starts to
> > become
> > > > > > apparent.
> > > > > > >> >> Say
> > > > > > >> >> > > you
> > > > > > >> >> > > > > set the retention to one week and then mirror data
> > > from a
> > > > > > topic
> > > > > > >> >> > > > containing
> > > > > > >> >> > > > > two years of retention. Your intention is clearly
> to
> > > keep
> > > > > the
> > > > > > >> last
> > > > > > >> >> > > week,
> > > > > > >> >> > > > > but because the mirroring is appending right now
> you
> > > will
> > > > > > keep
> > > > > > >> two
> > > > > > >> >> > > years.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > The reason we are liking log append time is because
> > we
> > > > are
> > > > > > >> >> > > (justifiably)
> > > > > > >> >> > > > > concerned that in certain situations the creation
> > time
> > > > may
> > > > > > not
> > > > > > >> be
> > > > > > >> >> > > > > trustworthy. This same problem exists on the
> servers
> > > but
> > > > > > there
> > > > > > >> are
> > > > > > >> >> > > fewer
> > > > > > >> >> > > > > servers and they just run the kafka code so it is
> > less
> > > of
> > > > > an
> > > > > > >> >> issue.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > There are two possible ways to handle this:
> > > > > > >> >> > > > >
> > > > > > >> >> > > > >    1. Just tell people to add size based
> retention. I
> > > > think
> > > > > > this
> > > > > > >> >> is
> > > > > > >> >> > not
> > > > > > >> >> > > > >    entirely unreasonable, we're basically saying we
> > > > retain
> > > > > > data
> > > > > > >> >> based
> > > > > > >> >> > > on
> > > > > > >> >> > > > the
> > > > > > >> >> > > > >    timestamp you give us in the data. If you give
> us
> > > bad
> > > > > > data we
> > > > > > >> >> will
> > > > > > >> >> > > > retain
> > > > > > >> >> > > > >    it for a bad amount of time. If you want to
> ensure
> > > we
> > > > > > don't
> > > > > > >> >> retain
> > > > > > >> >> > > > "too
> > > > > > >> >> > > > >    much" data, define "too much" by setting a
> > > time-based
> > > > > > >> retention
> > > > > > >> >> > > > setting.
> > > > > > >> >> > > > >    This is not entirely unreasonable but kind of
> > > suffers
> > > > > > from a
> > > > > > >> >> "one
> > > > > > >> >> > > bad
> > > > > > >> >> > > > >    apple" problem in a very large environment.
> > > > > > >> >> > > > >    2. Prevent bad timestamps. In general we can't
> > say a
> > > > > > >> timestamp
> > > > > > >> >> is
> > > > > > >> >> > > bad.
> > > > > > >> >> > > > >    However the definition we're implicitly using is
> > > that
> > > > we
> > > > > > >> think
> > > > > > >> >> > there
> > > > > > >> >> > > > are a
> > > > > > >> >> > > > >    set of topics/clusters where the creation
> > timestamp
> > > > > should
> > > > > > >> >> always
> > > > > > >> >> > be
> > > > > > >> >> > > > "very
> > > > > > >> >> > > > >    close" to the log append timestamp. This is true
> > for
> > > > > data
> > > > > > >> >> sources
> > > > > > >> >> > > > that have
> > > > > > >> >> > > > >    no buffering capability (which at LinkedIn is
> very
> > > > > common,
> > > > > > >> but
> > > > > > >> >> is
> > > > > > >> >> > > > more rare
> > > > > > >> >> > > > >    elsewhere). The solution in this case would be
> to
> > > > allow
> > > > > a
> > > > > > >> >> setting
> > > > > > >> >> > > > along the
> > > > > > >> >> > > > >    lines of max.append.delay which checks the
> > creation
> > > > > > timestamp
> > > > > > >> >> > > against
> > > > > > >> >> > > > the
> > > > > > >> >> > > > >    server time to look for too large a divergence.
> > The
> > > > > > solution
> > > > > > >> >> would
> > > > > > >> >> > > > either
> > > > > > >> >> > > > >    be to reject the message or to override it with
> > the
> > > > > server
> > > > > > >> >> time.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > So in LI's environment you would configure the
> > clusters
> > > > > used
> > > > > > for
> > > > > > >> >> > > direct,
> > > > > > >> >> > > > > unbuffered, message production (e.g. tracking and
> > > metrics
> > > > > > local)
> > > > > > >> >> to
> > > > > > >> >> > > > enforce
> > > > > > >> >> > > > > a reasonably aggressive timestamp bound (say 10
> > mins),
> > > > and
> > > > > > all
> > > > > > >> >> other
> > > > > > >> >> > > > > clusters would just inherent these.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > The downside of this approach is requiring the
> > special
> > > > > > >> >> configuration.
> > > > > > >> >> > > > > However I think in 99% of environments this could
> be
> > > > > skipped
> > > > > > >> >> > entirely,
> > > > > > >> >> > > > it's
> > > > > > >> >> > > > > only when the ratio of clients to servers gets so
> > > massive
> > > > > > that
> > > > > > >> you
> > > > > > >> >> > need
> > > > > > >> >> > > > to
> > > > > > >> >> > > > > do this. The primary upside is that you have a
> single
> > > > > > >> >> authoritative
> > > > > > >> >> > > > notion
> > > > > > >> >> > > > > of time which is closest to what a user would want
> > and
> > > is
> > > > > > stored
> > > > > > >> >> > > directly
> > > > > > >> >> > > > > in the message.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > I'm also assuming there is a workable approach for
> > > > indexing
> > > > > > >> >> > > non-monotonic
> > > > > > >> >> > > > > timestamps, though I haven't actually worked that
> > out.
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > -Jay
> > > > > > >> >> > > > >
> > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie Qin
> > > > > > >> >> > <j...@linkedin.com.invalid
> > > > > > >> >> > > >
> > > > > > >> >> > > > > wrote:
> > > > > > >> >> > > > >
> > > > > > >> >> > > > >> Bumping up this thread although most of the
> > discussion
> > > > > were
> > > > > > on
> > > > > > >> >> the
> > > > > > >> >> > > > >> discussion thread of KIP-31 :)
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> I just updated the KIP page to add detailed
> solution
> > > for
> > > > > the
> > > > > > >> >> option
> > > > > > >> >> > > > >> (Option
> > > > > > >> >> > > > >> 3) that does not expose the LogAppendTime to user.
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >>
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> The option has a minor change to the fetch request
> > to
> > > > > allow
> > > > > > >> >> fetching
> > > > > > >> >> > > > time
> > > > > > >> >> > > > >> index entry as well. I kind of like this solution
> > > > because
> > > > > > its
> > > > > > >> >> just
> > > > > > >> >> > > doing
> > > > > > >> >> > > > >> what we need without introducing other things.
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> It will be great to see what are the feedback. I
> can
> > > > > explain
> > > > > > >> more
> > > > > > >> >> > > during
> > > > > > >> >> > > > >> tomorrow's KIP hangout.
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> Thanks,
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> Jiangjie (Becket) Qin
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie Qin <
> > > > > > >> j...@linkedin.com
> > > > > > >> >> >
> > > > > > >> >> > > > wrote:
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >> > Hi Jay,
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> > I just copy/pastes here your feedback on the
> > > timestamp
> > > > > > >> proposal
> > > > > > >> >> > that
> > > > > > >> >> > > > was
> > > > > > >> >> > > > >> > in the discussion thread of KIP-31. Please see
> the
> > > > > replies
> > > > > > >> >> inline.
> > > > > > >> >> > > > >> > The main change I made compared with previous
> > > proposal
> > > > > is
> > > > > > to
> > > > > > >> >> add
> > > > > > >> >> > > both
> > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the message.
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay Kreps <
> > > > > > j...@confluent.io
> > > > > > >> >
> > > > > > >> >> > > wrote:
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> > > Hey Beckett,
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > I was proposing splitting up the KIP just for
> > > > > > simplicity of
> > > > > > >> >> > > > >> discussion.
> > > > > > >> >> > > > >> > You
> > > > > > >> >> > > > >> > > can still implement them in one patch. I think
> > > > > > otherwise it
> > > > > > >> >> will
> > > > > > >> >> > > be
> > > > > > >> >> > > > >> hard
> > > > > > >> >> > > > >> > to
> > > > > > >> >> > > > >> > > discuss/vote on them since if you like the
> > offset
> > > > > > proposal
> > > > > > >> >> but
> > > > > > >> >> > not
> > > > > > >> >> > > > the
> > > > > > >> >> > > > >> > time
> > > > > > >> >> > > > >> > > proposal what do you do?
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > Introducing a second notion of time into Kafka
> > is
> > > a
> > > > > > pretty
> > > > > > >> >> > massive
> > > > > > >> >> > > > >> > > philosophical change so it kind of warrants
> it's
> > > own
> > > > > > KIP I
> > > > > > >> >> think
> > > > > > >> >> > > it
> > > > > > >> >> > > > >> > isn't
> > > > > > >> >> > > > >> > > just "Change message format".
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > WRT time I think one thing to clarify in the
> > > > proposal
> > > > > is
> > > > > > >> how
> > > > > > >> >> MM
> > > > > > >> >> > > will
> > > > > > >> >> > > > >> have
> > > > > > >> >> > > > >> > > access to set the timestamp? Presumably this
> > will
> > > > be a
> > > > > > new
> > > > > > >> >> field
> > > > > > >> >> > > in
> > > > > > >> >> > > > >> > > ProducerRecord, right? If so then any user can
> > set
> > > > the
> > > > > > >> >> > timestamp,
> > > > > > >> >> > > > >> right?
> > > > > > >> >> > > > >> > > I'm not sure you answered the questions around
> > how
> > > > > this
> > > > > > >> will
> > > > > > >> >> > work
> > > > > > >> >> > > > for
> > > > > > >> >> > > > >> MM
> > > > > > >> >> > > > >> > > since when MM retains timestamps from multiple
> > > > > > partitions
> > > > > > >> >> they
> > > > > > >> >> > > will
> > > > > > >> >> > > > >> then
> > > > > > >> >> > > > >> > be
> > > > > > >> >> > > > >> > > out of order and in the past (so the
> > > > > > >> >> max(lastAppendedTimestamp,
> > > > > > >> >> > > > >> > > currentTimeMillis) override you proposed will
> > not
> > > > > work,
> > > > > > >> >> right?).
> > > > > > >> >> > > If
> > > > > > >> >> > > > we
> > > > > > >> >> > > > >> > > don't do this then when you set up mirroring
> the
> > > > data
> > > > > > will
> > > > > > >> >> all
> > > > > > >> >> > be
> > > > > > >> >> > > > new
> > > > > > >> >> > > > >> and
> > > > > > >> >> > > > >> > > you have the same retention problem you
> > described.
> > > > > > Maybe I
> > > > > > >> >> > missed
> > > > > > >> >> > > > >> > > something...?
> > > > > > >> >> > > > >> > lastAppendedTimestamp means the timestamp of the
> > > > message
> > > > > > that
> > > > > > >> >> last
> > > > > > >> >> > > > >> > appended to the log.
> > > > > > >> >> > > > >> > If a broker is a leader, since it will assign
> the
> > > > > > timestamp
> > > > > > >> by
> > > > > > >> >> > > itself,
> > > > > > >> >> > > > >> the
> > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local clock
> when
> > > > append
> > > > > > the
> > > > > > >> >> last
> > > > > > >> >> > > > >> message.
> > > > > > >> >> > > > >> > So if there is no leader migration,
> > > > > > >> max(lastAppendedTimestamp,
> > > > > > >> >> > > > >> > currentTimeMillis) = currentTimeMillis.
> > > > > > >> >> > > > >> > If a broker is a follower, because it will keep
> > the
> > > > > > leader's
> > > > > > >> >> > > timestamp
> > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would be the
> > > leader's
> > > > > > clock
> > > > > > >> >> when
> > > > > > >> >> > it
> > > > > > >> >> > > > >> appends
> > > > > > >> >> > > > >> > that message message. It keeps track of the
> > > > > > lastAppendedTime
> > > > > > >> >> only
> > > > > > >> >> > in
> > > > > > >> >> > > > >> case
> > > > > > >> >> > > > >> > it becomes leader later on. At that point, it is
> > > > > possible
> > > > > > >> that
> > > > > > >> >> the
> > > > > > >> >> > > > >> > timestamp of the last appended message was
> stamped
> > > by
> > > > > old
> > > > > > >> >> leader,
> > > > > > >> >> > > but
> > > > > > >> >> > > > >> the
> > > > > > >> >> > > > >> > new leader's currentTimeMillis <
> lastAppendedTime.
> > > If
> > > > a
> > > > > > new
> > > > > > >> >> > message
> > > > > > >> >> > > > >> comes,
> > > > > > >> >> > > > >> > instead of stamp it with new leader's
> > > > currentTimeMillis,
> > > > > > we
> > > > > > >> >> have
> > > > > > >> >> > to
> > > > > > >> >> > > > >> stamp
> > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the timestamp in
> > the
> > > > log
> > > > > > >> going
> > > > > > >> >> > > > backward.
> > > > > > >> >> > > > >> > The max(lastAppendedTimestamp,
> currentTimeMillis)
> > is
> > > > > > purely
> > > > > > >> >> based
> > > > > > >> >> > on
> > > > > > >> >> > > > the
> > > > > > >> >> > > > >> > broker side clock. If MM produces message with
> > > > different
> > > > > > >> >> > > LogAppendTime
> > > > > > >> >> > > > >> in
> > > > > > >> >> > > > >> > source clusters to the same target cluster, the
> > > > > > LogAppendTime
> > > > > > >> >> will
> > > > > > >> >> > > be
> > > > > > >> >> > > > >> > ignored  re-stamped by target cluster.
> > > > > > >> >> > > > >> > I added a use case example for mirror maker in
> > > KIP-32.
> > > > > > Also
> > > > > > >> >> there
> > > > > > >> >> > > is a
> > > > > > >> >> > > > >> > corner case discussion about when we need the
> > > > > > >> >> > max(lastAppendedTime,
> > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you take a look
> > and
> > > > see
> > > > > if
> > > > > > >> that
> > > > > > >> >> > > > answers
> > > > > > >> >> > > > >> > your question?
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > My main motivation is that given that both
> Samza
> > > and
> > > > > > Kafka
> > > > > > >> >> > streams
> > > > > > >> >> > > > are
> > > > > > >> >> > > > >> > > doing work that implies a mandatory
> > client-defined
> > > > > > notion
> > > > > > >> of
> > > > > > >> >> > > time, I
> > > > > > >> >> > > > >> > really
> > > > > > >> >> > > > >> > > think introducing a different mandatory notion
> > of
> > > > time
> > > > > > in
> > > > > > >> >> Kafka
> > > > > > >> >> > is
> > > > > > >> >> > > > >> going
> > > > > > >> >> > > > >> > to
> > > > > > >> >> > > > >> > > be quite odd. We should think hard about how
> > > > > > client-defined
> > > > > > >> >> time
> > > > > > >> >> > > > could
> > > > > > >> >> > > > >> > > work. I'm not sure if it can, but I'm also not
> > > sure
> > > > > > that it
> > > > > > >> >> > can't.
> > > > > > >> >> > > > >> Having
> > > > > > >> >> > > > >> > > both will be odd. Did you chat about this with
> > > > > > Yi/Kartik on
> > > > > > >> >> the
> > > > > > >> >> > > > Samza
> > > > > > >> >> > > > >> > side?
> > > > > > >> >> > > > >> > I talked with Kartik and realized that it would
> be
> > > > > useful
> > > > > > to
> > > > > > >> >> have
> > > > > > >> >> > a
> > > > > > >> >> > > > >> client
> > > > > > >> >> > > > >> > timestamp to facilitate use cases like stream
> > > > > processing.
> > > > > > >> >> > > > >> > I was trying to figure out if we can simply use
> > > client
> > > > > > >> >> timestamp
> > > > > > >> >> > > > without
> > > > > > >> >> > > > >> > introducing the server time. There are some
> > > discussion
> > > > > in
> > > > > > the
> > > > > > >> >> KIP.
> > > > > > >> >> > > > >> > The key problem we want to solve here is
> > > > > > >> >> > > > >> > 1. We want log retention and rolling to depend
> on
> > > > server
> > > > > > >> clock.
> > > > > > >> >> > > > >> > 2. We want to make sure the log-assiciated
> > timestamp
> > > > to
> > > > > be
> > > > > > >> >> > retained
> > > > > > >> >> > > > when
> > > > > > >> >> > > > >> > replicas moves.
> > > > > > >> >> > > > >> > 3. We want to use the timestamp in some way that
> > can
> > > > > allow
> > > > > > >> >> > searching
> > > > > > >> >> > > > by
> > > > > > >> >> > > > >> > timestamp.
> > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass the
> > > > > log-associated
> > > > > > >> >> > timestamp
> > > > > > >> >> > > > >> > through replication, that means we need to have
> a
> > > > > > different
> > > > > > >> >> > protocol
> > > > > > >> >> > > > for
> > > > > > >> >> > > > >> > replica fetching to pass log-associated
> timestamp.
> > > It
> > > > is
> > > > > > >> >> actually
> > > > > > >> >> > > > >> > complicated and there could be a lot of corner
> > cases
> > > > to
> > > > > > >> handle.
> > > > > > >> >> > e.g.
> > > > > > >> >> > > > >> what
> > > > > > >> >> > > > >> > if an old leader started to fetch from the new
> > > leader,
> > > > > > should
> > > > > > >> >> it
> > > > > > >> >> > > also
> > > > > > >> >> > > > >> > update all of its old log segment timestamp?
> > > > > > >> >> > > > >> > I think actually client side timestamp would be
> > > better
> > > > > > for 3
> > > > > > >> >> if we
> > > > > > >> >> > > can
> > > > > > >> >> > > > >> > find a way to make it work.
> > > > > > >> >> > > > >> > So far I am not able to convince myself that
> only
> > > > having
> > > > > > >> client
> > > > > > >> >> > side
> > > > > > >> >> > > > >> > timestamp would work mainly because 1 and 2.
> There
> > > > are a
> > > > > > few
> > > > > > >> >> > > > situations
> > > > > > >> >> > > > >> I
> > > > > > >> >> > > > >> > mentioned in the KIP.
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > When you are saying it won't work you are
> > assuming
> > > > > some
> > > > > > >> >> > particular
> > > > > > >> >> > > > >> > > implementation? Maybe that the index is a
> > > > > monotonically
> > > > > > >> >> > increasing
> > > > > > >> >> > > > >> set of
> > > > > > >> >> > > > >> > > pointers to the least record with a timestamp
> > > larger
> > > > > > than
> > > > > > >> the
> > > > > > >> >> > > index
> > > > > > >> >> > > > >> time?
> > > > > > >> >> > > > >> > > In other words a search for time X gives the
> > > largest
> > > > > > offset
> > > > > > >> >> at
> > > > > > >> >> > > which
> > > > > > >> >> > > > >> all
> > > > > > >> >> > > > >> > > records are <= X?
> > > > > > >> >> > > > >> > It is a promising idea. We probably can have an
> > > > > in-memory
> > > > > > >> index
> > > > > > >> >> > like
> > > > > > >> >> > > > >> that,
> > > > > > >> >> > > > >> > but might be complicated to have a file on disk
> > like
> > > > > that.
> > > > > > >> >> Imagine
> > > > > > >> >> > > > there
> > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see message Y
> > created
> > > > at
> > > > > T1
> > > > > > >> and
> > > > > > >> >> > > created
> > > > > > >> >> > > > >> > index like [T1->Y], then we see message created
> at
> > > T1,
> > > > > > >> >> supposedly
> > > > > > >> >> > we
> > > > > > >> >> > > > >> should
> > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], it is easy
> to
> > > do
> > > > in
> > > > > > >> >> memory,
> > > > > > >> >> > but
> > > > > > >> >> > > > we
> > > > > > >> >> > > > >> > might have to rewrite the index file completely.
> > > Maybe
> > > > > we
> > > > > > can
> > > > > > >> >> have
> > > > > > >> >> > > the
> > > > > > >> >> > > > >> > first entry with timestamp to 0, and only update
> > the
> > > > > first
> > > > > > >> >> pointer
> > > > > > >> >> > > for
> > > > > > >> >> > > > >> any
> > > > > > >> >> > > > >> > out of range timestamp, so the index will be
> > [0->X,
> > > > > > T1->Y].
> > > > > > >> >> Also,
> > > > > > >> >> > > the
> > > > > > >> >> > > > >> range
> > > > > > >> >> > > > >> > of timestamps in the log segments can overlap
> with
> > > > each
> > > > > > >> other.
> > > > > > >> >> > That
> > > > > > >> >> > > > >> means
> > > > > > >> >> > > > >> > we either need to keep a cross segments index
> file
> > > or
> > > > we
> > > > > > need
> > > > > > >> >> to
> > > > > > >> >> > > check
> > > > > > >> >> > > > >> all
> > > > > > >> >> > > > >> > the index file for each log segment.
> > > > > > >> >> > > > >> > I separated out the time based log index to
> KIP-33
> > > > > > because it
> > > > > > >> >> can
> > > > > > >> >> > be
> > > > > > >> >> > > > an
> > > > > > >> >> > > > >> > independent follow up feature as Neha
> suggested. I
> > > > will
> > > > > > try
> > > > > > >> to
> > > > > > >> >> > make
> > > > > > >> >> > > > the
> > > > > > >> >> > > > >> > time based index work with client side
> timestamp.
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > For retention, I agree with the problem you
> > point
> > > > out,
> > > > > > but
> > > > > > >> I
> > > > > > >> >> > think
> > > > > > >> >> > > > >> what
> > > > > > >> >> > > > >> > you
> > > > > > >> >> > > > >> > > are saying in that case is that you want a
> size
> > > > limit
> > > > > > too.
> > > > > > >> If
> > > > > > >> >> > you
> > > > > > >> >> > > > use
> > > > > > >> >> > > > >> > > system time you actually hit the same problem:
> > say
> > > > you
> > > > > > do a
> > > > > > >> >> full
> > > > > > >> >> > > > dump
> > > > > > >> >> > > > >> of
> > > > > > >> >> > > > >> > a
> > > > > > >> >> > > > >> > > DB table with a setting of 7 days retention,
> > your
> > > > > > retention
> > > > > > >> >> will
> > > > > > >> >> > > > >> actually
> > > > > > >> >> > > > >> > > not get enforced for the first 7 days because
> > the
> > > > data
> > > > > > is
> > > > > > >> >> "new
> > > > > > >> >> > to
> > > > > > >> >> > > > >> Kafka".
> > > > > > >> >> > > > >> > I kind of think the size limit here is
> orthogonal.
> > > It
> > > > > is a
> > > > > > >> >> valid
> > > > > > >> >> > use
> > > > > > >> >> > > > >> case
> > > > > > >> >> > > > >> > where people only want to use time based
> retention
> > > > only.
> > > > > > In
> > > > > > >> >> your
> > > > > > >> >> > > > >> example,
> > > > > > >> >> > > > >> > depending on client timestamp might break the
> > > > > > functionality -
> > > > > > >> >> say
> > > > > > >> >> > it
> > > > > > >> >> > > > is
> > > > > > >> >> > > > >> a
> > > > > > >> >> > > > >> > bootstrap case people actually need to read all
> > the
> > > > > data.
> > > > > > If
> > > > > > >> we
> > > > > > >> >> > > depend
> > > > > > >> >> > > > >> on
> > > > > > >> >> > > > >> > the client timestamp, the data might be deleted
> > > > > instantly
> > > > > > >> when
> > > > > > >> >> > they
> > > > > > >> >> > > > >> come to
> > > > > > >> >> > > > >> > the broker. It might be too demanding to expect
> > the
> > > > > > broker to
> > > > > > >> >> > > > understand
> > > > > > >> >> > > > >> > what people actually want to do with the data
> > coming
> > > > in.
> > > > > > So
> > > > > > >> the
> > > > > > >> >> > > > >> guarantee
> > > > > > >> >> > > > >> > of using server side timestamp is that "after
> > > appended
> > > > > to
> > > > > > the
> > > > > > >> >> log,
> > > > > > >> >> > > all
> > > > > > >> >> > > > >> > messages will be available on broker for
> retention
> > > > > time",
> > > > > > >> >> which is
> > > > > > >> >> > > not
> > > > > > >> >> > > > >> > changeable by clients.
> > > > > > >> >> > > > >> > >
> > > > > > >> >> > > > >> > > -Jay
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM, Jiangjie Qin <
> > > > > > >> >> j...@linkedin.com
> > > > > > >> >> > >
> > > > > > >> >> > > > >> wrote:
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >> >> Hi folks,
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >> This proposal was previously in KIP-31 and we
> > > > separated
> > > > > > it
> > > > > > >> to
> > > > > > >> >> > > KIP-32
> > > > > > >> >> > > > >> per
> > > > > > >> >> > > > >> >> Neha and Jay's suggestion.
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >> The proposal is to add the following two
> > timestamps
> > > > to
> > > > > > Kafka
> > > > > > >> >> > > message.
> > > > > > >> >> > > > >> >> - CreateTime
> > > > > > >> >> > > > >> >> - LogAppendTime
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >> The CreateTime will be set by the producer and
> > will
> > > > > > change
> > > > > > >> >> after
> > > > > > >> >> > > > that.
> > > > > > >> >> > > > >> >> The LogAppendTime will be set by broker for
> > purpose
> > > > > such
> > > > > > as
> > > > > > >> >> > enforce
> > > > > > >> >> > > > log
> > > > > > >> >> > > > >> >> retention and log rolling.
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >> Thanks,
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >>
> > > > > > >> >> > > > >> >
> > > > > > >> >> > > > >>
> > > > > > >> >> > > > >
> > > > > > >> >> > > > >
> > > > > > >> >> > > >
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> > >
> > > > > > >> >> > > --
> > > > > > >> >> > > -- Guozhang
> > > > > > >> >> > >
> > > > > > >> >> >
> > > > > > >> >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to