One motivation of my proposal is actually to avoid any clients trying to
read the timestamp type from the topic metadata response and behave
differently since:

1) topic metadata response is not always in-sync with the source-of-truth
(ZK), hence when the clients realized that the config has changed it may
already be too late (i.e. for consumer the records with the wrong timestamp
could already be returned to user).

2) the client logic could be a bit simpler, and this will benefit non-Java
development a lot. Also we can avoid adding this into the topic metadata
response.

Guozhang

On Tue, Jan 26, 2016 at 3:20 PM, Becket Qin <becket....@gmail.com> wrote:

> My hesitation for the changed protocol is that I think If we will have
> topic configuration returned in the topic metadata, the current protocol
> makes more sense. Because the timestamp type is a topic level setting so we
> don't need to put it into each message. That is assuming the timestamp type
> change on a topic rarely happens and if it is ever needed, the existing
> data should be wiped out.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > Bump up this thread per discussion on the KIP hangout.
> >
> > During the implementation of the KIP, Guozhang raised another proposal on
> > how to indicate the message timestamp type used by messages. So we want
> to
> > see people's opinion on this proposal.
> >
> > The difference between current and the new proposal only differs on
> > messages that are a) compressed, and b) using LogAppendTime
> >
> > For compressed messages using LogAppendTime, the timestamps in the
> current
> > proposal is as below:
> > 1. When a producer produces the messages, it tries to set timestamp to -1
> > for inner messages if it knows LogAppendTime is used.
> > 2. When a broker receives the messages, it will overwrite the timestamp
> of
> > inner message to -1 if needed and write server time to the wrapper
> message.
> > Broker will do re-compression if inner message timestamp is overwritten.
> > 3. When a consumer receives the messages, it will see the inner message
> > timestamp is -1 so the wrapper message timestamp is used.
> >
> > Implementation wise, this proposal requires the producer to set timestamp
> > for inner messages correctly to avoid broker side re-compression. To do
> > that, the short term solution is to let producer infer the timestamp type
> > returned by broker in ProduceResponse and set correct timestamp
> afterwards.
> > This means the first few batches will still need re-compression on the
> > broker. The long term solution is to have producer get topic
> configuration
> > during metadata update.
> >
> >
> > The proposed modification is:
> > 1. When a producer produces the messages, it always using create time.
> > 2. When a broker receives the messages, it ignores the inner messages
> > timestamp, but simply set a wrapper message timestamp type attribute bit
> to
> > 1 and set the timestamp of the wrapper message to server time. (The
> broker
> > will also set the timesatmp type attribute bit accordingly for
> > non-compressed messages using LogAppendTime).
> > 3. When a consumer receives the messages, it checks timestamp type
> > attribute bit of wrapper message. If it is set to 1, the inner message's
> > timestamp will be ignored and the wrapper message's timestamp will be
> used.
> >
> > This approach uses an extra attribute bit. The good thing of the modified
> > protocol is consumers will be able to know the timestamp type. And
> > re-compression on broker side is completely avoided no matter what value
> is
> > sent by the producer. In this approach the inner messages will have wrong
> > timestamps.
> >
> > We want to see if people have concerns over the modified approach.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> >> Jun,
> >>
> >> 1. I agree it would be nice to have the timestamps used in a unified
> way.
> >> My concern is that if we let server change timestamp of the inner
> message
> >> for LogAppendTime, that will enforce the user who are using
> LogAppendTime
> >> to always pay the recompression penalty. So using LogAppendTime makes
> >> KIP-31 in vain.
> >>
> >> 4. If there are no entries in the log segment, we can read from the time
> >> index before the previous log segment. If there is no previous entry
> >> avaliable after we search until the earliest log segment, that means all
> >> the previous log segment with a valid time index entry has been
> deleted. In
> >> that case supposedly there should be only one log segment left - the
> active
> >> log segment, we can simply set the latest timestamp to 0.
> >>
> >> Guozhang,
> >>
> >> Sorry for the confusion. by "the timestamp of the latest message" I
> >> actually meant "the timestamp of the message with largest timestamp".
> So in
> >> your example the "latest message" is 5.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >>> Jun, Jiangjie,
> >>>
> >>> I am confused about 3) here, if we use "the timestamp of the latest
> >>> message"
> >>> then doesn't this mean we will roll the log whenever a message delayed
> by
> >>> rolling time is received as well? Just to clarify, my understanding of
> >>> "the
> >>> timestamp of the latest message", for example in the following log, is
> 1,
> >>> not 5:
> >>>
> >>> 2, 3, 4, 5, 1
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote:
> >>>
> >>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
> >>> whether
> >>> > the messages are compressed or not. When
> >>> > message.timestamp.type=LogAppendTime,
> >>> > we will need to set timestamp in each message if messages are not
> >>> > compressed, so that the follower can get the same timestamp. So, it
> >>> seems
> >>> > that we should do the same thing for inner messages when messages are
> >>> > compressed.
> >>> >
> >>> > 4. I thought on startup, we restore the timestamp of the latest
> >>> message by
> >>> > reading from the time index of the last log segment. So, what happens
> >>> if
> >>> > there are no index entries?
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Jun
> >>> >
> >>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>> >
> >>> > > Thanks for the explanation, Jun.
> >>> > >
> >>> > > 1. That makes sense. So maybe we can do the following:
> >>> > > (a) Set the timestamp in the compressed message to latest timestamp
> >>> of
> >>> > all
> >>> > > its inner messages. This works for both LogAppendTime and
> CreateTime.
> >>> > > (b) If message.timestamp.type=LogAppendTime, the broker will
> >>> overwrite
> >>> > all
> >>> > > the inner message timestamp to -1 if they are not set to -1. This
> is
> >>> > mainly
> >>> > > for topics that are using LogAppendTime. Hopefully the producer
> will
> >>> set
> >>> > > the timestamp to -1 in the ProducerRecord to avoid server side
> >>> > > recompression.
> >>> > >
> >>> > > 3. I see. That works. So the semantic of log rolling becomes "roll
> >>> out
> >>> > the
> >>> > > log segment if it has been inactive since the latest message has
> >>> > arrived."
> >>> > >
> >>> > > 4. Yes. If the largest timestamp is in previous log segment. The
> time
> >>> > index
> >>> > > for the current log segment does not have a valid offset in current
> >>> log
> >>> > > segment to point to. Maybe in that case we should build an empty
> log
> >>> > index.
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > Jiangjie (Becket) Qin
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io> wrote:
> >>> > >
> >>> > > > 1. I was thinking more about saving the decompression overhead in
> >>> the
> >>> > > > follower. Currently, the follower doesn't decompress the
> messages.
> >>> To
> >>> > > keep
> >>> > > > it that way, the outer message needs to include the timestamp of
> >>> the
> >>> > > latest
> >>> > > > inner message to build the time index in the follower. The
> simplest
> >>> > thing
> >>> > > > to do is to change the timestamp in the inner messages if
> >>> necessary, in
> >>> > > > which case there will be the recompression overhead. However, in
> >>> the
> >>> > case
> >>> > > > when the timestamp of the inner messages don't have to be changed
> >>> > > > (hopefully more common), there won't be the recompression
> >>> overhead. In
> >>> > > > either case, we always set the timestamp in the outer message to
> >>> be the
> >>> > > > timestamp of the latest inner message, in the leader.
> >>> > > >
> >>> > > > 3. Basically, in each log segment, we keep track of the timestamp
> >>> of
> >>> > the
> >>> > > > latest message. If current time - timestamp of latest message >
> log
> >>> > > rolling
> >>> > > > interval, we roll a new log segment. So, if messages with later
> >>> > > timestamps
> >>> > > > keep getting added, we only roll new log segments based on size.
> >>> On the
> >>> > > > other hand, if no new messages are added to a log, we can force a
> >>> log
> >>> > > roll
> >>> > > > based on time, which addresses the issue in (b).
> >>> > > >
> >>> > > > 4. Hmm, the index is per segment and should only point to
> >>> positions in
> >>> > > the
> >>> > > > corresponding .log file, not previous ones, right?
> >>> > > >
> >>> > > > Thanks,
> >>> > > >
> >>> > > > Jun
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <
> becket....@gmail.com>
> >>> > > wrote:
> >>> > > >
> >>> > > > > Hi Jun,
> >>> > > > >
> >>> > > > > Thanks a lot for the comments. Please see inline replies.
> >>> > > > >
> >>> > > > > Thanks,
> >>> > > > >
> >>> > > > > Jiangjie (Becket) Qin
> >>> > > > >
> >>> > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io>
> >>> wrote:
> >>> > > > >
> >>> > > > > > Hi, Becket,
> >>> > > > > >
> >>> > > > > > Thanks for the proposal. Looks good overall. A few comments
> >>> below.
> >>> > > > > >
> >>> > > > > > 1. KIP-32 didn't say what timestamp should be set in a
> >>> compressed
> >>> > > > > message.
> >>> > > > > > We probably should set it to the timestamp of the latest
> >>> messages
> >>> > > > > included
> >>> > > > > > in the compressed one. This way, during indexing, we don't
> >>> have to
> >>> > > > > > decompress the message.
> >>> > > > > >
> >>> > > > > That is a good point.
> >>> > > > > In normal cases, broker needs to decompress the message for
> >>> > > verification
> >>> > > > > purpose anyway. So building time index does not add additional
> >>> > > > > decompression.
> >>> > > > > During time index recovery, however, having a timestamp in
> >>> compressed
> >>> > > > > message might save the decompression.
> >>> > > > >
> >>> > > > > Another thing I am thinking is we should make sure KIP-32 works
> >>> well
> >>> > > with
> >>> > > > > KIP-31. i.e. we don't want to do recompression in order to add
> >>> > > timestamp
> >>> > > > to
> >>> > > > > messages.
> >>> > > > > Take the approach in my last email, the timestamp in the
> messages
> >>> > will
> >>> > > > > either all be overwritten by server if
> >>> > > > > message.timestamp.type=LogAppendTime, or they will not be
> >>> overwritten
> >>> > > if
> >>> > > > > message.timestamp.type=CreateTime.
> >>> > > > >
> >>> > > > > Maybe we can use the timestamp in compressed messages in the
> >>> > following
> >>> > > > way:
> >>> > > > > If message.timestamp.type=LogAppendTime, we have to overwrite
> >>> > > timestamps
> >>> > > > > for all the messages. We can simply write the timestamp in the
> >>> > > compressed
> >>> > > > > message to avoid recompression.
> >>> > > > > If message.timestamp.type=CreateTime, we do not need to
> >>> overwrite the
> >>> > > > > timestamps. We either reject the entire compressed message or
> We
> >>> just
> >>> > > > leave
> >>> > > > > the compressed message timestamp to be -1.
> >>> > > > >
> >>> > > > > So the semantic of the timestamp field in compressed message
> >>> field
> >>> > > > becomes:
> >>> > > > > if it is greater than 0, that means LogAppendTime is used, the
> >>> > > timestamp
> >>> > > > of
> >>> > > > > the inner messages is the compressed message LogAppendTime. If
> >>> it is
> >>> > > -1,
> >>> > > > > that means the CreateTime is used, the timestamp is in each
> >>> > individual
> >>> > > > > inner message.
> >>> > > > >
> >>> > > > > This sacrifice the speed of recovery but seems worthy because
> we
> >>> > avoid
> >>> > > > > recompression.
> >>> > > > >
> >>> > > > >
> >>> > > > > > 2. In KIP-33, should we make the time-based index interval
> >>> > > > configurable?
> >>> > > > > > Perhaps we can default it 60 secs, but allow users to
> >>> configure it
> >>> > to
> >>> > > > > > smaller values if they want more precision.
> >>> > > > > >
> >>> > > > > Yes, we can do that.
> >>> > > > >
> >>> > > > >
> >>> > > > > > 3. In KIP-33, I am not sure if log rolling should be based on
> >>> the
> >>> > > > > earliest
> >>> > > > > > message. This would mean that we will need to roll a log
> >>> segment
> >>> > > every
> >>> > > > > time
> >>> > > > > > we get a message delayed by the log rolling time interval.
> >>> Also, on
> >>> > > > > broker
> >>> > > > > > startup, we can get the timestamp of the latest message in a
> >>> log
> >>> > > > segment
> >>> > > > > > pretty efficiently by just looking at the last time index
> >>> entry.
> >>> > But
> >>> > > > > > getting the timestamp of the earliest timestamp requires a
> full
> >>> > scan
> >>> > > of
> >>> > > > > all
> >>> > > > > > log segments, which can be expensive. Previously, there were
> >>> two
> >>> > use
> >>> > > > > cases
> >>> > > > > > of time-based rolling: (a) more accurate time-based indexing
> >>> and
> >>> > (b)
> >>> > > > > > retaining data by time (since the active segment is never
> >>> deleted).
> >>> > > (a)
> >>> > > > > is
> >>> > > > > > already solved with a time-based index. For (b), if the
> >>> retention
> >>> > is
> >>> > > > > based
> >>> > > > > > on the timestamp of the latest message in a log segment,
> >>> perhaps
> >>> > log
> >>> > > > > > rolling should be based on that too.
> >>> > > > > >
> >>> > > > > I am not sure how to make log rolling work with the latest
> >>> timestamp
> >>> > in
> >>> > > > > current log segment. Do you mean the log rolling can based on
> the
> >>> > last
> >>> > > > log
> >>> > > > > segment's latest timestamp? If so how do we roll out the first
> >>> > segment?
> >>> > > > >
> >>> > > > >
> >>> > > > > > 4. In KIP-33, I presume the timestamp in the time index will
> be
> >>> > > > > > monotonically increasing. So, if all messages in a log
> segment
> >>> > have a
> >>> > > > > > timestamp less than the largest timestamp in the previous log
> >>> > > segment,
> >>> > > > we
> >>> > > > > > will use the latter to index this log segment?
> >>> > > > > >
> >>> > > > > Yes. The timestamps are monotonically increasing. If the
> largest
> >>> > > > timestamp
> >>> > > > > in the previous segment is very big, it is possible the time
> >>> index of
> >>> > > the
> >>> > > > > current segment only have two index entries (inserted during
> >>> segment
> >>> > > > > creation and roll out), both are pointing to a message in the
> >>> > previous
> >>> > > > log
> >>> > > > > segment. This is the corner case I mentioned before that we
> >>> should
> >>> > > expire
> >>> > > > > the next log segment even before expiring the previous log
> >>> segment
> >>> > just
> >>> > > > > because the largest timestamp is in previous log segment. In
> >>> current
> >>> > > > > approach, we will wait until the previous log segment expires,
> >>> and
> >>> > then
> >>> > > > > delete both the previous log segment and the next log segment.
> >>> > > > >
> >>> > > > >
> >>> > > > > > 5. In KIP-32, in the wire protocol, we mention both timestamp
> >>> and
> >>> > > time.
> >>> > > > > > They should be consistent.
> >>> > > > > >
> >>> > > > > Will fix the wiki page.
> >>> > > > >
> >>> > > > >
> >>> > > > > > Jun
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin <
> >>> becket....@gmail.com
> >>> > >
> >>> > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hey Jay,
> >>> > > > > > >
> >>> > > > > > > Thanks for the comments.
> >>> > > > > > >
> >>> > > > > > > Good point about the actions after when
> >>> > max.message.time.difference
> >>> > > > is
> >>> > > > > > > exceeded. Rejection is a useful behavior although I cannot
> >>> think
> >>> > of
> >>> > > > use
> >>> > > > > > > case at LinkedIn at this moment. I think it makes sense to
> >>> add a
> >>> > > > > > > configuration.
> >>> > > > > > >
> >>> > > > > > > How about the following configurations?
> >>> > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime
> >>> > > > > > > 2. max.message.time.difference.ms
> >>> > > > > > >
> >>> > > > > > > if message.timestamp.type is set to CreateTime, when the
> >>> broker
> >>> > > > > receives
> >>> > > > > > a
> >>> > > > > > > message, it will further check
> >>> max.message.time.difference.ms,
> >>> > and
> >>> > > > > will
> >>> > > > > > > reject the message it the time difference exceeds the
> >>> threshold.
> >>> > > > > > > If message.timestamp.type is set to LogAppendTime, the
> broker
> >>> > will
> >>> > > > > always
> >>> > > > > > > stamp the message with current server time, regardless the
> >>> value
> >>> > of
> >>> > > > > > > max.message.time.difference.ms
> >>> > > > > > >
> >>> > > > > > > This will make sure the message on the broker is either
> >>> > CreateTime
> >>> > > or
> >>> > > > > > > LogAppendTime, but not mixture of both.
> >>> > > > > > >
> >>> > > > > > > What do you think?
> >>> > > > > > >
> >>> > > > > > > Thanks,
> >>> > > > > > >
> >>> > > > > > > Jiangjie (Becket) Qin
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <
> j...@confluent.io>
> >>> > > wrote:
> >>> > > > > > >
> >>> > > > > > > > Hey Becket,
> >>> > > > > > > >
> >>> > > > > > > > That summary of pros and cons sounds about right to me.
> >>> > > > > > > >
> >>> > > > > > > > There are potentially two actions you could take when
> >>> > > > > > > > max.message.time.difference is exceeded--override it or
> >>> reject
> >>> > > the
> >>> > > > > > > > message entirely. Can we pick one of these or does the
> >>> action
> >>> > > need
> >>> > > > to
> >>> > > > > > > > be configurable too? (I'm not sure). The downside of more
> >>> > > > > > > > configuration is that it is more fiddly and has more
> modes.
> >>> > > > > > > >
> >>> > > > > > > > I suppose the reason I was thinking of this as a
> >>> "difference"
> >>> > > > rather
> >>> > > > > > > > than a hard type was that if you were going to go the
> >>> reject
> >>> > mode
> >>> > > > you
> >>> > > > > > > > would need some tolerance setting (i.e. if your SLA is
> >>> that if
> >>> > > your
> >>> > > > > > > > timestamp is off by more than 10 minutes I give you an
> >>> error).
> >>> > I
> >>> > > > > agree
> >>> > > > > > > > with you that having one field that is potentially
> >>> containing a
> >>> > > mix
> >>> > > > > of
> >>> > > > > > > > two values is a bit weird.
> >>> > > > > > > >
> >>> > > > > > > > -Jay
> >>> > > > > > > >
> >>> > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin <
> >>> > becket....@gmail.com
> >>> > > >
> >>> > > > > > wrote:
> >>> > > > > > > > > It looks the format of the previous email was messed
> up.
> >>> Send
> >>> > > it
> >>> > > > > > again.
> >>> > > > > > > > >
> >>> > > > > > > > > Just to recap, the last proposal Jay made (with some
> >>> > > > implementation
> >>> > > > > > > > > details added)
> >>> > > > > > > > > was:
> >>> > > > > > > > >
> >>> > > > > > > > > 1. Allow user to stamp the message when produce
> >>> > > > > > > > >
> >>> > > > > > > > > 2. When broker receives a message it take a look at the
> >>> > > > difference
> >>> > > > > > > > between
> >>> > > > > > > > > its local time and the timestamp in the message.
> >>> > > > > > > > >   a. If the time difference is within a configurable
> >>> > > > > > > > > max.message.time.difference.ms, the server will accept
> >>> it
> >>> > and
> >>> > > > > append
> >>> > > > > > > it
> >>> > > > > > > > to
> >>> > > > > > > > > the log.
> >>> > > > > > > > >   b. If the time difference is beyond the configured
> >>> > > > > > > > > max.message.time.difference.ms, the server will
> >>> override the
> >>> > > > > > timestamp
> >>> > > > > > > > with
> >>> > > > > > > > > its current local time and append the message to the
> log.
> >>> > > > > > > > >   c. The default value of max.message.time.difference
> >>> would
> >>> > be
> >>> > > > set
> >>> > > > > to
> >>> > > > > > > > > Long.MaxValue.
> >>> > > > > > > > >
> >>> > > > > > > > > 3. The configurable time difference threshold
> >>> > > > > > > > > max.message.time.difference.ms will
> >>> > > > > > > > > be a per topic configuration.
> >>> > > > > > > > >
> >>> > > > > > > > > 4. The indexed will be built so it has the following
> >>> > guarantee.
> >>> > > > > > > > >   a. If user search by time stamp:
> >>> > > > > > > > >       - all the messages after that timestamp will be
> >>> > consumed.
> >>> > > > > > > > >       - user might see earlier messages.
> >>> > > > > > > > >   b. The log retention will take a look at the last
> time
> >>> > index
> >>> > > > > entry
> >>> > > > > > in
> >>> > > > > > > > the
> >>> > > > > > > > > time index file. Because the last entry will be the
> >>> latest
> >>> > > > > timestamp
> >>> > > > > > in
> >>> > > > > > > > the
> >>> > > > > > > > > entire log segment. If that entry expires, the log
> >>> segment
> >>> > will
> >>> > > > be
> >>> > > > > > > > deleted.
> >>> > > > > > > > >   c. The log rolling has to depend on the earliest
> >>> timestamp.
> >>> > > In
> >>> > > > > this
> >>> > > > > > > > case
> >>> > > > > > > > > we may need to keep a in memory timestamp only for the
> >>> > current
> >>> > > > > active
> >>> > > > > > > > log.
> >>> > > > > > > > > On recover, we will need to read the active log segment
> >>> to
> >>> > get
> >>> > > > this
> >>> > > > > > > > timestamp
> >>> > > > > > > > > of the earliest messages.
> >>> > > > > > > > >
> >>> > > > > > > > > 5. The downside of this proposal are:
> >>> > > > > > > > >   a. The timestamp might not be monotonically
> increasing.
> >>> > > > > > > > >   b. The log retention might become non-deterministic.
> >>> i.e.
> >>> > > When
> >>> > > > a
> >>> > > > > > > > message
> >>> > > > > > > > > will be deleted now depends on the timestamp of the
> other
> >>> > > > messages
> >>> > > > > in
> >>> > > > > > > the
> >>> > > > > > > > > same log segment. And those timestamps are provided by
> >>> > > > > > > > > user within a range depending on what the time
> difference
> >>> > > > threshold
> >>> > > > > > > > > configuration is.
> >>> > > > > > > > >   c. The semantic meaning of the timestamp in the
> >>> messages
> >>> > > could
> >>> > > > > be a
> >>> > > > > > > > little
> >>> > > > > > > > > bit vague because some of them come from the producer
> and
> >>> > some
> >>> > > of
> >>> > > > > > them
> >>> > > > > > > > are
> >>> > > > > > > > > overwritten by brokers.
> >>> > > > > > > > >
> >>> > > > > > > > > 6. Although the proposal has some downsides, it gives
> >>> user
> >>> > the
> >>> > > > > > > > flexibility
> >>> > > > > > > > > to use the timestamp.
> >>> > > > > > > > >   a. If the threshold is set to Long.MaxValue. The
> >>> timestamp
> >>> > in
> >>> > > > the
> >>> > > > > > > > message is
> >>> > > > > > > > > equivalent to CreateTime.
> >>> > > > > > > > >   b. If the threshold is set to 0. The timestamp in the
> >>> > message
> >>> > > > is
> >>> > > > > > > > equivalent
> >>> > > > > > > > > to LogAppendTime.
> >>> > > > > > > > >
> >>> > > > > > > > > This proposal actually allows user to use either
> >>> CreateTime
> >>> > or
> >>> > > > > > > > LogAppendTime
> >>> > > > > > > > > without introducing two timestamp concept at the same
> >>> time. I
> >>> > > > have
> >>> > > > > > > > updated
> >>> > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal.
> >>> > > > > > > > >
> >>> > > > > > > > > One thing I am thinking is that instead of having a
> time
> >>> > > > difference
> >>> > > > > > > > threshold,
> >>> > > > > > > > > should we simply set have a TimestampType
> configuration?
> >>> > > Because
> >>> > > > in
> >>> > > > > > > most
> >>> > > > > > > > > cases, people will either set the threshold to 0 or
> >>> > > > Long.MaxValue.
> >>> > > > > > > > Setting
> >>> > > > > > > > > anything in between will make the timestamp in the
> >>> message
> >>> > > > > > meaningless
> >>> > > > > > > to
> >>> > > > > > > > > user - user don't know if the timestamp has been
> >>> overwritten
> >>> > by
> >>> > > > the
> >>> > > > > > > > brokers.
> >>> > > > > > > > >
> >>> > > > > > > > > Any thoughts?
> >>> > > > > > > > >
> >>> > > > > > > > > Thanks,
> >>> > > > > > > > > Jiangjie (Becket) Qin
> >>> > > > > > > > >
> >>> > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin
> >>> > > > > > > <j...@linkedin.com.invalid
> >>> > > > > > > > >
> >>> > > > > > > > > wrote:
> >>> > > > > > > > >
> >>> > > > > > > > >> Bump up this thread.
> >>> > > > > > > > >>
> >>> > > > > > > > >> Just to recap, the last proposal Jay made (with some
> >>> > > > > implementation
> >>> > > > > > > > details
> >>> > > > > > > > >> added) was:
> >>> > > > > > > > >>
> >>> > > > > > > > >>    1. Allow user to stamp the message when produce
> >>> > > > > > > > >>    2. When broker receives a message it take a look at
> >>> the
> >>> > > > > > difference
> >>> > > > > > > > >>    between its local time and the timestamp in the
> >>> message.
> >>> > > > > > > > >>       - If the time difference is within a
> configurable
> >>> > > > > > > > >>       max.message.time.difference.ms, the server will
> >>> > accept
> >>> > > it
> >>> > > > > and
> >>> > > > > > > > append
> >>> > > > > > > > >>       it to the log.
> >>> > > > > > > > >>       - If the time difference is beyond the
> configured
> >>> > > > > > > > >>       max.message.time.difference.ms, the server will
> >>> > > override
> >>> > > > > the
> >>> > > > > > > > >>       timestamp with its current local time and append
> >>> the
> >>> > > > message
> >>> > > > > > to
> >>> > > > > > > > the
> >>> > > > > > > > >> log.
> >>> > > > > > > > >>       - The default value of
> max.message.time.difference
> >>> > would
> >>> > > > be
> >>> > > > > > set
> >>> > > > > > > to
> >>> > > > > > > > >>       Long.MaxValue.
> >>> > > > > > > > >>       3. The configurable time difference threshold
> >>> > > > > > > > >>    max.message.time.difference.ms will be a per topic
> >>> > > > > > configuration.
> >>> > > > > > > > >>    4. The indexed will be built so it has the
> following
> >>> > > > guarantee.
> >>> > > > > > > > >>       - If user search by time stamp:
> >>> > > > > > > > >>    - all the messages after that timestamp will be
> >>> consumed.
> >>> > > > > > > > >>       - user might see earlier messages.
> >>> > > > > > > > >>       - The log retention will take a look at the last
> >>> time
> >>> > > > index
> >>> > > > > > > entry
> >>> > > > > > > > in
> >>> > > > > > > > >>       the time index file. Because the last entry will
> >>> be
> >>> > the
> >>> > > > > latest
> >>> > > > > > > > >> timestamp in
> >>> > > > > > > > >>       the entire log segment. If that entry expires,
> >>> the log
> >>> > > > > segment
> >>> > > > > > > > will
> >>> > > > > > > > >> be
> >>> > > > > > > > >>       deleted.
> >>> > > > > > > > >>       - The log rolling has to depend on the earliest
> >>> > > timestamp.
> >>> > > > > In
> >>> > > > > > > this
> >>> > > > > > > > >>       case we may need to keep a in memory timestamp
> >>> only
> >>> > for
> >>> > > > the
> >>> > > > > > > > >> current active
> >>> > > > > > > > >>       log. On recover, we will need to read the active
> >>> log
> >>> > > > segment
> >>> > > > > > to
> >>> > > > > > > > get
> >>> > > > > > > > >> this
> >>> > > > > > > > >>       timestamp of the earliest messages.
> >>> > > > > > > > >>    5. The downside of this proposal are:
> >>> > > > > > > > >>       - The timestamp might not be monotonically
> >>> increasing.
> >>> > > > > > > > >>       - The log retention might become
> >>> non-deterministic.
> >>> > i.e.
> >>> > > > > When
> >>> > > > > > a
> >>> > > > > > > > >>       message will be deleted now depends on the
> >>> timestamp
> >>> > of
> >>> > > > the
> >>> > > > > > > > >> other messages
> >>> > > > > > > > >>       in the same log segment. And those timestamps
> are
> >>> > > provided
> >>> > > > > by
> >>> > > > > > > > >> user within a
> >>> > > > > > > > >>       range depending on what the time difference
> >>> threshold
> >>> > > > > > > > configuration
> >>> > > > > > > > >> is.
> >>> > > > > > > > >>       - The semantic meaning of the timestamp in the
> >>> > messages
> >>> > > > > could
> >>> > > > > > > be a
> >>> > > > > > > > >>       little bit vague because some of them come from
> >>> the
> >>> > > > producer
> >>> > > > > > and
> >>> > > > > > > > >> some of
> >>> > > > > > > > >>       them are overwritten by brokers.
> >>> > > > > > > > >>       6. Although the proposal has some downsides, it
> >>> gives
> >>> > > user
> >>> > > > > the
> >>> > > > > > > > >>    flexibility to use the timestamp.
> >>> > > > > > > > >>    - If the threshold is set to Long.MaxValue. The
> >>> timestamp
> >>> > > in
> >>> > > > > the
> >>> > > > > > > > message
> >>> > > > > > > > >>       is equivalent to CreateTime.
> >>> > > > > > > > >>       - If the threshold is set to 0. The timestamp in
> >>> the
> >>> > > > message
> >>> > > > > > is
> >>> > > > > > > > >>       equivalent to LogAppendTime.
> >>> > > > > > > > >>
> >>> > > > > > > > >> This proposal actually allows user to use either
> >>> CreateTime
> >>> > or
> >>> > > > > > > > >> LogAppendTime without introducing two timestamp
> concept
> >>> at
> >>> > the
> >>> > > > > same
> >>> > > > > > > > time. I
> >>> > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with this
> >>> > > proposal.
> >>> > > > > > > > >>
> >>> > > > > > > > >> One thing I am thinking is that instead of having a
> time
> >>> > > > > difference
> >>> > > > > > > > >> threshold, should we simply set have a TimestampType
> >>> > > > > configuration?
> >>> > > > > > > > Because
> >>> > > > > > > > >> in most cases, people will either set the threshold to
> >>> 0 or
> >>> > > > > > > > Long.MaxValue.
> >>> > > > > > > > >> Setting anything in between will make the timestamp in
> >>> the
> >>> > > > message
> >>> > > > > > > > >> meaningless to user - user don't know if the timestamp
> >>> has
> >>> > > been
> >>> > > > > > > > overwritten
> >>> > > > > > > > >> by the brokers.
> >>> > > > > > > > >>
> >>> > > > > > > > >> Any thoughts?
> >>> > > > > > > > >>
> >>> > > > > > > > >> Thanks,
> >>> > > > > > > > >> Jiangjie (Becket) Qin
> >>> > > > > > > > >>
> >>> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin <
> >>> > > > j...@linkedin.com>
> >>> > > > > > > > wrote:
> >>> > > > > > > > >>
> >>> > > > > > > > >> > Hi Jay,
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > Thanks for such detailed explanation. I think we
> both
> >>> are
> >>> > > > trying
> >>> > > > > > to
> >>> > > > > > > > make
> >>> > > > > > > > >> > CreateTime work for us if possible. To me by "work"
> it
> >>> > means
> >>> > > > > clear
> >>> > > > > > > > >> > guarantees on:
> >>> > > > > > > > >> > 1. Log Retention Time enforcement.
> >>> > > > > > > > >> > 2. Log Rolling time enforcement (This might be less
> a
> >>> > > concern
> >>> > > > as
> >>> > > > > > you
> >>> > > > > > > > >> > pointed out)
> >>> > > > > > > > >> > 3. Application search message by time.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > WRT (1), I agree the expectation for log retention
> >>> might
> >>> > be
> >>> > > > > > > different
> >>> > > > > > > > >> > depending on who we ask. But my concern is about the
> >>> level
> >>> > > of
> >>> > > > > > > > guarantee
> >>> > > > > > > > >> we
> >>> > > > > > > > >> > give to user. My observation is that a clear
> >>> guarantee to
> >>> > > user
> >>> > > > > is
> >>> > > > > > > > >> critical
> >>> > > > > > > > >> > regardless of the mechanism we choose. And this is
> the
> >>> > > subtle
> >>> > > > > but
> >>> > > > > > > > >> important
> >>> > > > > > > > >> > difference between using LogAppendTime and
> CreateTime.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > Let's say user asks this question: How long will my
> >>> > message
> >>> > > > stay
> >>> > > > > > in
> >>> > > > > > > > >> Kafka?
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > If we use LogAppendTime for log retention, the
> answer
> >>> is
> >>> > > > message
> >>> > > > > > > will
> >>> > > > > > > > >> stay
> >>> > > > > > > > >> > in Kafka for retention time after the message is
> >>> produced
> >>> > > (to
> >>> > > > be
> >>> > > > > > > more
> >>> > > > > > > > >> > precise, upper bounded by log.rolling.ms +
> >>> > log.retention.ms
> >>> > > ).
> >>> > > > > > User
> >>> > > > > > > > has a
> >>> > > > > > > > >> > clear guarantee and they may decide whether or not
> to
> >>> put
> >>> > > the
> >>> > > > > > > message
> >>> > > > > > > > >> into
> >>> > > > > > > > >> > Kafka. Or how to adjust the retention time according
> >>> to
> >>> > > their
> >>> > > > > > > > >> requirements.
> >>> > > > > > > > >> > If we use create time for log retention, the answer
> >>> would
> >>> > be
> >>> > > > it
> >>> > > > > > > > depends.
> >>> > > > > > > > >> > The best answer we can give is at least
> retention.ms
> >>> > > because
> >>> > > > > > there
> >>> > > > > > > > is no
> >>> > > > > > > > >> > guarantee when the messages will be deleted after
> >>> that.
> >>> > If a
> >>> > > > > > message
> >>> > > > > > > > sits
> >>> > > > > > > > >> > somewhere behind a larger create time, the message
> >>> might
> >>> > > stay
> >>> > > > > > longer
> >>> > > > > > > > than
> >>> > > > > > > > >> > expected. But we don't know how longer it would be
> >>> because
> >>> > > it
> >>> > > > > > > depends
> >>> > > > > > > > on
> >>> > > > > > > > >> > the create time. In this case, it is hard for user
> to
> >>> > decide
> >>> > > > > what
> >>> > > > > > to
> >>> > > > > > > > do.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > I am worrying about this because a blurring
> guarantee
> >>> has
> >>> > > > bitten
> >>> > > > > > us
> >>> > > > > > > > >> > before, e.g. Topic creation. We have received many
> >>> > questions
> >>> > > > > like
> >>> > > > > > > > "why my
> >>> > > > > > > > >> > topic is not there after I created it". I can
> imagine
> >>> we
> >>> > > > receive
> >>> > > > > > > > similar
> >>> > > > > > > > >> > question asking "why my message is still there after
> >>> > > retention
> >>> > > > > > time
> >>> > > > > > > > has
> >>> > > > > > > > >> > reached". So my understanding is that a clear and
> >>> solid
> >>> > > > > guarantee
> >>> > > > > > is
> >>> > > > > > > > >> better
> >>> > > > > > > > >> > than having a mechanism that works in most cases but
> >>> > > > > occasionally
> >>> > > > > > > does
> >>> > > > > > > > >> not
> >>> > > > > > > > >> > work.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > If we think of the retention guarantee we provide
> with
> >>> > > > > > > LogAppendTime,
> >>> > > > > > > > it
> >>> > > > > > > > >> > is not broken as you said, because we are telling
> >>> user the
> >>> > > log
> >>> > > > > > > > retention
> >>> > > > > > > > >> is
> >>> > > > > > > > >> > NOT based on create time at the first place.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > WRT (3), no matter whether we index on LogAppendTime
> >>> or
> >>> > > > > > CreateTime,
> >>> > > > > > > > the
> >>> > > > > > > > >> > best guarantee we can provide with user is "not
> >>> missing
> >>> > > > message
> >>> > > > > > > after
> >>> > > > > > > > a
> >>> > > > > > > > >> > certain timestamp". Therefore I actually really like
> >>> to
> >>> > > index
> >>> > > > on
> >>> > > > > > > > >> CreateTime
> >>> > > > > > > > >> > because that is the timestamp we provide to user,
> and
> >>> we
> >>> > can
> >>> > > > > have
> >>> > > > > > > the
> >>> > > > > > > > >> solid
> >>> > > > > > > > >> > guarantee.
> >>> > > > > > > > >> > On the other hand, indexing on LogAppendTime and
> >>> giving
> >>> > user
> >>> > > > > > > > CreateTime
> >>> > > > > > > > >> > does not provide solid guarantee when user do search
> >>> based
> >>> > > on
> >>> > > > > > > > timestamp.
> >>> > > > > > > > >> It
> >>> > > > > > > > >> > only works when LogAppendTime is always no earlier
> >>> than
> >>> > > > > > CreateTime.
> >>> > > > > > > > This
> >>> > > > > > > > >> is
> >>> > > > > > > > >> > a reasonable assumption and we can easily enforce
> it.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > With above, I am not sure if we can avoid server
> >>> timestamp
> >>> > > to
> >>> > > > > make
> >>> > > > > > > log
> >>> > > > > > > > >> > retention work with a clear guarantee. For searching
> >>> by
> >>> > > > > timestamp
> >>> > > > > > > use
> >>> > > > > > > > >> case,
> >>> > > > > > > > >> > I really want to have the index built on CreateTime.
> >>> But
> >>> > > with
> >>> > > > a
> >>> > > > > > > > >> reasonable
> >>> > > > > > > > >> > assumption and timestamp enforcement, a
> LogAppendTime
> >>> > index
> >>> > > > > would
> >>> > > > > > > also
> >>> > > > > > > > >> work.
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > Thanks,
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > Jiangjie (Becket) Qin
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >
> >>> > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps <
> >>> > > j...@confluent.io
> >>> > > > >
> >>> > > > > > > wrote:
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >> Hey Becket,
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> Let me see if I can address your concerns:
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> 1. Let's say we have two source clusters that are
> >>> > mirrored
> >>> > > to
> >>> > > > > the
> >>> > > > > > > > same
> >>> > > > > > > > >> >> > target cluster. For some reason one of the mirror
> >>> maker
> >>> > > > from
> >>> > > > > a
> >>> > > > > > > > cluster
> >>> > > > > > > > >> >> dies
> >>> > > > > > > > >> >> > and after fix the issue we want to resume
> >>> mirroring. In
> >>> > > > this
> >>> > > > > > case
> >>> > > > > > > > it
> >>> > > > > > > > >> is
> >>> > > > > > > > >> >> > possible that when the mirror maker resumes
> >>> mirroring,
> >>> > > the
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> of
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > messages have already gone beyond the acceptable
> >>> > > timestamp
> >>> > > > > > range
> >>> > > > > > > on
> >>> > > > > > > > >> >> broker.
> >>> > > > > > > > >> >> > In order to let those messages go through, we
> have
> >>> to
> >>> > > bump
> >>> > > > up
> >>> > > > > > the
> >>> > > > > > > > >> >> > *max.append.delay
> >>> > > > > > > > >> >> > *for all the topics on the target broker. This
> >>> could be
> >>> > > > > > painful.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> Actually what I was suggesting was different. Here
> >>> is my
> >>> > > > > > > observation:
> >>> > > > > > > > >> >> clusters/topics directly produced to by
> applications
> >>> > have a
> >>> > > > > valid
> >>> > > > > > > > >> >> assertion
> >>> > > > > > > > >> >> that log append time and create time are similar
> >>> (let's
> >>> > > call
> >>> > > > > > these
> >>> > > > > > > > >> >> "unbuffered"); other cluster/topic such as those
> that
> >>> > > receive
> >>> > > > > > data
> >>> > > > > > > > from
> >>> > > > > > > > >> a
> >>> > > > > > > > >> >> database, a log file, or another kafka cluster
> don't
> >>> have
> >>> > > > that
> >>> > > > > > > > >> assertion,
> >>> > > > > > > > >> >> for these "buffered" clusters data can be
> arbitrarily
> >>> > late.
> >>> > > > > This
> >>> > > > > > > > means
> >>> > > > > > > > >> any
> >>> > > > > > > > >> >> use of log append time on these buffered clusters
> is
> >>> not
> >>> > > very
> >>> > > > > > > > >> meaningful,
> >>> > > > > > > > >> >> and create time and log append time "should" be
> >>> similar
> >>> > on
> >>> > > > > > > unbuffered
> >>> > > > > > > > >> >> clusters so you can probably use either.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> Using log append time on buffered clusters actually
> >>> > results
> >>> > > > in
> >>> > > > > > bad
> >>> > > > > > > > >> things.
> >>> > > > > > > > >> >> If you request the offset for a given time you get
> >>> don't
> >>> > > end
> >>> > > > up
> >>> > > > > > > > getting
> >>> > > > > > > > >> >> data for that time but rather data that showed up
> at
> >>> that
> >>> > > > time.
> >>> > > > > > If
> >>> > > > > > > > you
> >>> > > > > > > > >> try
> >>> > > > > > > > >> >> to retain 7 days of data it may mostly work but any
> >>> kind
> >>> > of
> >>> > > > > > > > >> bootstrapping
> >>> > > > > > > > >> >> will result in retaining much more (potentially the
> >>> whole
> >>> > > > > > database
> >>> > > > > > > > >> >> contents!).
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> So what I am suggesting in terms of the use of the
> >>> > > > > > max.append.delay
> >>> > > > > > > > is
> >>> > > > > > > > >> >> that
> >>> > > > > > > > >> >> unbuffered clusters would have this set and
> buffered
> >>> > > clusters
> >>> > > > > > would
> >>> > > > > > > > not.
> >>> > > > > > > > >> >> In
> >>> > > > > > > > >> >> other words, in LI terminology, tracking and
> metrics
> >>> > > clusters
> >>> > > > > > would
> >>> > > > > > > > have
> >>> > > > > > > > >> >> this enforced, aggregate and replica clusters
> >>> wouldn't.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> So you DO have the issue of potentially maintaining
> >>> more
> >>> > > data
> >>> > > > > > than
> >>> > > > > > > > you
> >>> > > > > > > > >> >> need
> >>> > > > > > > > >> >> to on aggregate clusters if your mirroring skews,
> >>> but you
> >>> > > > DON'T
> >>> > > > > > > need
> >>> > > > > > > > to
> >>> > > > > > > > >> >> tweak the setting as you described.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> 2. Let's say in the above scenario we let the
> >>> messages
> >>> > in,
> >>> > > at
> >>> > > > > > that
> >>> > > > > > > > point
> >>> > > > > > > > >> >> > some log segments in the target cluster might
> have
> >>> a
> >>> > wide
> >>> > > > > range
> >>> > > > > > > of
> >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log
> rolling
> >>> > could
> >>> > > > be
> >>> > > > > > > tricky
> >>> > > > > > > > >> >> because
> >>> > > > > > > > >> >> > the first time index entry does not necessarily
> >>> have
> >>> > the
> >>> > > > > > smallest
> >>> > > > > > > > >> >> timestamp
> >>> > > > > > > > >> >> > of all the messages in the log segment. Instead,
> >>> it is
> >>> > > the
> >>> > > > > > > largest
> >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
> >>> log to
> >>> > > find
> >>> > > > > the
> >>> > > > > > > > >> message
> >>> > > > > > > > >> >> > with smallest offset to see if we should roll.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> I think there are two uses for time-based log
> >>> rolling:
> >>> > > > > > > > >> >> 1. Making the offset lookup by timestamp work
> >>> > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if it
> >>> is
> >>> > > > supposed
> >>> > > > > > to
> >>> > > > > > > > get
> >>> > > > > > > > >> >> purged after 7 days
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> But think about these two use cases. (1) is totally
> >>> > > obviated
> >>> > > > by
> >>> > > > > > the
> >>> > > > > > > > >> >> time=>offset index we are adding which yields much
> >>> more
> >>> > > > > granular
> >>> > > > > > > > offset
> >>> > > > > > > > >> >> lookups. (2) Is actually totally broken if you
> >>> switch to
> >>> > > > append
> >>> > > > > > > time,
> >>> > > > > > > > >> >> right? If you want to be sure for security/privacy
> >>> > reasons
> >>> > > > you
> >>> > > > > > only
> >>> > > > > > > > >> retain
> >>> > > > > > > > >> >> 7 days of data then if the log append and create
> time
> >>> > > diverge
> >>> > > > > you
> >>> > > > > > > > >> actually
> >>> > > > > > > > >> >> violate this requirement.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> I think 95% of people care about (1) which is
> solved
> >>> in
> >>> > the
> >>> > > > > > > proposal
> >>> > > > > > > > and
> >>> > > > > > > > >> >> (2) is actually broken today as well as in both
> >>> > proposals.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> 3. Theoretically it is possible that an older log
> >>> segment
> >>> > > > > > contains
> >>> > > > > > > > >> >> > timestamps that are older than all the messages
> in
> >>> a
> >>> > > newer
> >>> > > > > log
> >>> > > > > > > > >> segment.
> >>> > > > > > > > >> >> It
> >>> > > > > > > > >> >> > would be weird that we are supposed to delete the
> >>> newer
> >>> > > log
> >>> > > > > > > segment
> >>> > > > > > > > >> >> before
> >>> > > > > > > > >> >> > we delete the older log segment.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> The index timestamps would always be a lower bound
> >>> (i.e.
> >>> > > the
> >>> > > > > > > maximum
> >>> > > > > > > > at
> >>> > > > > > > > >> >> that time) so I don't think that is possible.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >>  4. In bootstrap case, if we reload the data to a
> >>> Kafka
> >>> > > > > cluster,
> >>> > > > > > we
> >>> > > > > > > > have
> >>> > > > > > > > >> >> to
> >>> > > > > > > > >> >> > make sure we configure the topic correctly before
> >>> we
> >>> > load
> >>> > > > the
> >>> > > > > > > data.
> >>> > > > > > > > >> >> > Otherwise the message might either be rejected
> >>> because
> >>> > > the
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> is
> >>> > > > > > > > >> >> too
> >>> > > > > > > > >> >> > old, or it might be deleted immediately because
> the
> >>> > > > retention
> >>> > > > > > > time
> >>> > > > > > > > has
> >>> > > > > > > > >> >> > reached.
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> See (1).
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> -Jay
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin
> >>> > > > > > > > <j...@linkedin.com.invalid
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >> wrote:
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >> > Hey Jay and Guozhang,
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > Thanks a lot for the reply. So if I understand
> >>> > correctly,
> >>> > > > > Jay's
> >>> > > > > > > > >> proposal
> >>> > > > > > > > >> >> > is:
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > 1. Let client stamp the message create time.
> >>> > > > > > > > >> >> > 2. Broker build index based on client-stamped
> >>> message
> >>> > > > create
> >>> > > > > > > time.
> >>> > > > > > > > >> >> > 3. Broker only takes message whose create time is
> >>> > withing
> >>> > > > > > current
> >>> > > > > > > > time
> >>> > > > > > > > >> >> > plus/minus T (T is a configuration
> >>> *max.append.delay*,
> >>> > > > could
> >>> > > > > be
> >>> > > > > > > > topic
> >>> > > > > > > > >> >> level
> >>> > > > > > > > >> >> > configuration), if the timestamp is out of this
> >>> range,
> >>> > > > broker
> >>> > > > > > > > rejects
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > message.
> >>> > > > > > > > >> >> > 4. Because the create time of messages can be out
> >>> of
> >>> > > order,
> >>> > > > > > when
> >>> > > > > > > > >> broker
> >>> > > > > > > > >> >> > builds the time based index it only provides the
> >>> > > guarantee
> >>> > > > > that
> >>> > > > > > > if
> >>> > > > > > > > a
> >>> > > > > > > > >> >> > consumer starts consuming from the offset
> returned
> >>> by
> >>> > > > > searching
> >>> > > > > > > by
> >>> > > > > > > > >> >> > timestamp t, they will not miss any message
> created
> >>> > after
> >>> > > > t,
> >>> > > > > > but
> >>> > > > > > > > might
> >>> > > > > > > > >> >> see
> >>> > > > > > > > >> >> > some messages created before t.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > To build the time based index, every time when a
> >>> broker
> >>> > > > needs
> >>> > > > > > to
> >>> > > > > > > > >> insert
> >>> > > > > > > > >> >> a
> >>> > > > > > > > >> >> > new time index entry, the entry would be
> >>> > > > > > > > {Largest_Timestamp_Ever_Seen
> >>> > > > > > > > >> ->
> >>> > > > > > > > >> >> > Current_Offset}. This basically means any
> timestamp
> >>> > > larger
> >>> > > > > than
> >>> > > > > > > the
> >>> > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after this
> >>> offset
> >>> > > > > because
> >>> > > > > > > it
> >>> > > > > > > > >> never
> >>> > > > > > > > >> >> > saw them before. So we don't miss any message
> with
> >>> > larger
> >>> > > > > > > > timestamp.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > (@Guozhang, in this case, for log retention we
> only
> >>> > need
> >>> > > to
> >>> > > > > > take
> >>> > > > > > > a
> >>> > > > > > > > >> look
> >>> > > > > > > > >> >> at
> >>> > > > > > > > >> >> > the last time index entry, because it must be the
> >>> > largest
> >>> > > > > > > timestamp
> >>> > > > > > > > >> >> ever,
> >>> > > > > > > > >> >> > if that timestamp is overdue, we can safely
> delete
> >>> any
> >>> > > log
> >>> > > > > > > segment
> >>> > > > > > > > >> >> before
> >>> > > > > > > > >> >> > that. So we don't need to scan the log segment
> >>> file for
> >>> > > log
> >>> > > > > > > > retention)
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > I assume that we are still going to have the new
> >>> > > > FetchRequest
> >>> > > > > > to
> >>> > > > > > > > allow
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > time index replication for replicas.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > I think Jay's main point here is that we don't
> >>> want to
> >>> > > have
> >>> > > > > two
> >>> > > > > > > > >> >> timestamp
> >>> > > > > > > > >> >> > concepts in Kafka, which I agree is a reasonable
> >>> > concern.
> >>> > > > > And I
> >>> > > > > > > > also
> >>> > > > > > > > >> >> agree
> >>> > > > > > > > >> >> > that create time is more meaningful than
> >>> LogAppendTime
> >>> > > for
> >>> > > > > > users.
> >>> > > > > > > > But
> >>> > > > > > > > >> I
> >>> > > > > > > > >> >> am
> >>> > > > > > > > >> >> > not sure if making everything base on Create Time
> >>> would
> >>> > > > work
> >>> > > > > in
> >>> > > > > > > all
> >>> > > > > > > > >> >> cases.
> >>> > > > > > > > >> >> > Here are my questions about this approach:
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > 1. Let's say we have two source clusters that are
> >>> > > mirrored
> >>> > > > to
> >>> > > > > > the
> >>> > > > > > > > same
> >>> > > > > > > > >> >> > target cluster. For some reason one of the mirror
> >>> maker
> >>> > > > from
> >>> > > > > a
> >>> > > > > > > > cluster
> >>> > > > > > > > >> >> dies
> >>> > > > > > > > >> >> > and after fix the issue we want to resume
> >>> mirroring. In
> >>> > > > this
> >>> > > > > > case
> >>> > > > > > > > it
> >>> > > > > > > > >> is
> >>> > > > > > > > >> >> > possible that when the mirror maker resumes
> >>> mirroring,
> >>> > > the
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> of
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > messages have already gone beyond the acceptable
> >>> > > timestamp
> >>> > > > > > range
> >>> > > > > > > on
> >>> > > > > > > > >> >> broker.
> >>> > > > > > > > >> >> > In order to let those messages go through, we
> have
> >>> to
> >>> > > bump
> >>> > > > up
> >>> > > > > > the
> >>> > > > > > > > >> >> > *max.append.delay
> >>> > > > > > > > >> >> > *for all the topics on the target broker. This
> >>> could be
> >>> > > > > > painful.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > 2. Let's say in the above scenario we let the
> >>> messages
> >>> > > in,
> >>> > > > at
> >>> > > > > > > that
> >>> > > > > > > > >> point
> >>> > > > > > > > >> >> > some log segments in the target cluster might
> have
> >>> a
> >>> > wide
> >>> > > > > range
> >>> > > > > > > of
> >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log
> rolling
> >>> > could
> >>> > > > be
> >>> > > > > > > tricky
> >>> > > > > > > > >> >> because
> >>> > > > > > > > >> >> > the first time index entry does not necessarily
> >>> have
> >>> > the
> >>> > > > > > smallest
> >>> > > > > > > > >> >> timestamp
> >>> > > > > > > > >> >> > of all the messages in the log segment. Instead,
> >>> it is
> >>> > > the
> >>> > > > > > > largest
> >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
> >>> log to
> >>> > > find
> >>> > > > > the
> >>> > > > > > > > >> message
> >>> > > > > > > > >> >> > with smallest offset to see if we should roll.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > 3. Theoretically it is possible that an older log
> >>> > segment
> >>> > > > > > > contains
> >>> > > > > > > > >> >> > timestamps that are older than all the messages
> in
> >>> a
> >>> > > newer
> >>> > > > > log
> >>> > > > > > > > >> segment.
> >>> > > > > > > > >> >> It
> >>> > > > > > > > >> >> > would be weird that we are supposed to delete the
> >>> newer
> >>> > > log
> >>> > > > > > > segment
> >>> > > > > > > > >> >> before
> >>> > > > > > > > >> >> > we delete the older log segment.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to a
> >>> Kafka
> >>> > > > > cluster,
> >>> > > > > > > we
> >>> > > > > > > > >> have
> >>> > > > > > > > >> >> to
> >>> > > > > > > > >> >> > make sure we configure the topic correctly before
> >>> we
> >>> > load
> >>> > > > the
> >>> > > > > > > data.
> >>> > > > > > > > >> >> > Otherwise the message might either be rejected
> >>> because
> >>> > > the
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> is
> >>> > > > > > > > >> >> too
> >>> > > > > > > > >> >> > old, or it might be deleted immediately because
> the
> >>> > > > retention
> >>> > > > > > > time
> >>> > > > > > > > has
> >>> > > > > > > > >> >> > reached.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > I am very concerned about the operational
> overhead
> >>> and
> >>> > > the
> >>> > > > > > > > ambiguity
> >>> > > > > > > > >> of
> >>> > > > > > > > >> >> > guarantees we introduce if we purely rely on
> >>> > CreateTime.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > It looks to me that the biggest issue of adopting
> >>> > > > CreateTime
> >>> > > > > > > > >> everywhere
> >>> > > > > > > > >> >> is
> >>> > > > > > > > >> >> > CreateTime can have big gaps. These gaps could be
> >>> > caused
> >>> > > by
> >>> > > > > > > several
> >>> > > > > > > > >> >> cases:
> >>> > > > > > > > >> >> > [1]. Faulty clients
> >>> > > > > > > > >> >> > [2]. Natural delays from different source
> >>> > > > > > > > >> >> > [3]. Bootstrap
> >>> > > > > > > > >> >> > [4]. Failure recovery
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps
> >>> solve
> >>> > [2]
> >>> > > as
> >>> > > > > > well
> >>> > > > > > > > if we
> >>> > > > > > > > >> >> are
> >>> > > > > > > > >> >> > able to set a reasonable max.append.delay. But it
> >>> does
> >>> > > not
> >>> > > > > seem
> >>> > > > > > > > >> address
> >>> > > > > > > > >> >> [3]
> >>> > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are
> >>> solvable
> >>> > > > because
> >>> > > > > > it
> >>> > > > > > > > looks
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > CreateTime gap is unavoidable in those two cases.
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > Thanks,
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > Jiangjie (Becket) Qin
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang <
> >>> > > > > > > wangg...@gmail.com
> >>> > > > > > > > >
> >>> > > > > > > > >> >> wrote:
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > > Just to complete Jay's option, here is my
> >>> > > understanding:
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > 1. For log retention: if we want to remove data
> >>> > before
> >>> > > > time
> >>> > > > > > t,
> >>> > > > > > > we
> >>> > > > > > > > >> look
> >>> > > > > > > > >> >> > into
> >>> > > > > > > > >> >> > > the index file of each segment and find the
> >>> largest
> >>> > > > > timestamp
> >>> > > > > > > t'
> >>> > > > > > > > <
> >>> > > > > > > > >> t,
> >>> > > > > > > > >> >> > find
> >>> > > > > > > > >> >> > > the corresponding timestamp and start scanning
> >>> to the
> >>> > > end
> >>> > > > > of
> >>> > > > > > > the
> >>> > > > > > > > >> >> segment,
> >>> > > > > > > > >> >> > > if there is no entry with timestamp >= t, we
> can
> >>> > delete
> >>> > > > > this
> >>> > > > > > > > >> segment;
> >>> > > > > > > > >> >> if
> >>> > > > > > > > >> >> > a
> >>> > > > > > > > >> >> > > segment's index smallest timestamp is larger
> >>> than t,
> >>> > we
> >>> > > > can
> >>> > > > > > > skip
> >>> > > > > > > > >> that
> >>> > > > > > > > >> >> > > segment.
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > 2. For log rolling: if we want to start a new
> >>> segment
> >>> > > > after
> >>> > > > > > > time
> >>> > > > > > > > t,
> >>> > > > > > > > >> we
> >>> > > > > > > > >> >> > look
> >>> > > > > > > > >> >> > > into the active segment's index file, if the
> >>> largest
> >>> > > > > > timestamp
> >>> > > > > > > is
> >>> > > > > > > > >> >> > already >
> >>> > > > > > > > >> >> > > t, we can roll a new segment immediately; if it
> >>> is <
> >>> > t,
> >>> > > > we
> >>> > > > > > read
> >>> > > > > > > > its
> >>> > > > > > > > >> >> > > corresponding offset and start scanning to the
> >>> end of
> >>> > > the
> >>> > > > > > > > segment,
> >>> > > > > > > > >> if
> >>> > > > > > > > >> >> we
> >>> > > > > > > > >> >> > > find a record whose timestamp > t, we can roll
> a
> >>> new
> >>> > > > > segment.
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > For log rolling we only need to possibly scan a
> >>> small
> >>> > > > > portion
> >>> > > > > > > the
> >>> > > > > > > > >> >> active
> >>> > > > > > > > >> >> > > segment, which should be fine; for log
> retention
> >>> we
> >>> > may
> >>> > > > in
> >>> > > > > > the
> >>> > > > > > > > worst
> >>> > > > > > > > >> >> case
> >>> > > > > > > > >> >> > > end up scanning all segments, but in practice
> we
> >>> may
> >>> > > skip
> >>> > > > > > most
> >>> > > > > > > of
> >>> > > > > > > > >> them
> >>> > > > > > > > >> >> > > since their smallest timestamp in the index
> file
> >>> is
> >>> > > > larger
> >>> > > > > > than
> >>> > > > > > > > t.
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > Guozhang
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <
> >>> > > > > > j...@confluent.io>
> >>> > > > > > > > >> wrote:
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > > I think it should be possible to index
> >>> out-of-order
> >>> > > > > > > timestamps.
> >>> > > > > > > > >> The
> >>> > > > > > > > >> >> > > > timestamp index would be similar to the
> offset
> >>> > > index, a
> >>> > > > > > > memory
> >>> > > > > > > > >> >> mapped
> >>> > > > > > > > >> >> > > file
> >>> > > > > > > > >> >> > > > appended to as part of the log append, but
> >>> would
> >>> > have
> >>> > > > the
> >>> > > > > > > > format
> >>> > > > > > > > >> >> > > >   timestamp offset
> >>> > > > > > > > >> >> > > > The timestamp entries would be monotonic and
> as
> >>> > with
> >>> > > > the
> >>> > > > > > > offset
> >>> > > > > > > > >> >> index
> >>> > > > > > > > >> >> > > would
> >>> > > > > > > > >> >> > > > be no more often then every 4k (or some
> >>> > configurable
> >>> > > > > > > threshold
> >>> > > > > > > > to
> >>> > > > > > > > >> >> keep
> >>> > > > > > > > >> >> > > the
> >>> > > > > > > > >> >> > > > index small--actually for timestamp it could
> >>> > probably
> >>> > > > be
> >>> > > > > > much
> >>> > > > > > > > more
> >>> > > > > > > > >> >> > sparse
> >>> > > > > > > > >> >> > > > than 4k).
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > > > A search for a timestamp t yields an offset o
> >>> > before
> >>> > > > > which
> >>> > > > > > no
> >>> > > > > > > > >> prior
> >>> > > > > > > > >> >> > > message
> >>> > > > > > > > >> >> > > > has a timestamp >= t. In other words if you
> >>> read
> >>> > the
> >>> > > > log
> >>> > > > > > > > starting
> >>> > > > > > > > >> >> with
> >>> > > > > > > > >> >> > o
> >>> > > > > > > > >> >> > > > you are guaranteed not to miss any messages
> >>> > occurring
> >>> > > > at
> >>> > > > > t
> >>> > > > > > or
> >>> > > > > > > > >> later
> >>> > > > > > > > >> >> > > though
> >>> > > > > > > > >> >> > > > you may get many before t (due to
> >>> > out-of-orderness).
> >>> > > > > Unlike
> >>> > > > > > > the
> >>> > > > > > > > >> >> offset
> >>> > > > > > > > >> >> > > > index this bound doesn't really have to be
> >>> tight
> >>> > > (i.e.
> >>> > > > > > > > probably no
> >>> > > > > > > > >> >> need
> >>> > > > > > > > >> >> > > to
> >>> > > > > > > > >> >> > > > go search the log itself, though you could).
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > > > -Jay
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay Kreps <
> >>> > > > > > > j...@confluent.io>
> >>> > > > > > > > >> >> wrote:
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > > > > Here's my basic take:
> >>> > > > > > > > >> >> > > > > - I agree it would be nice to have a notion
> >>> of
> >>> > time
> >>> > > > > baked
> >>> > > > > > > in
> >>> > > > > > > > if
> >>> > > > > > > > >> it
> >>> > > > > > > > >> >> > were
> >>> > > > > > > > >> >> > > > > done right
> >>> > > > > > > > >> >> > > > > - All the proposals so far seem pretty
> >>> complex--I
> >>> > > > think
> >>> > > > > > > they
> >>> > > > > > > > >> might
> >>> > > > > > > > >> >> > make
> >>> > > > > > > > >> >> > > > > things worse rather than better overall
> >>> > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to the
> >>> > message
> >>> > > > is
> >>> > > > > > > > probably
> >>> > > > > > > > >> a
> >>> > > > > > > > >> >> > > > > non-starter from a size perspective
> >>> > > > > > > > >> >> > > > > - Even if it isn't in the message, having
> two
> >>> > > notions
> >>> > > > > of
> >>> > > > > > > time
> >>> > > > > > > > >> that
> >>> > > > > > > > >> >> > > > control
> >>> > > > > > > > >> >> > > > > different things is a bit confusing
> >>> > > > > > > > >> >> > > > > - The mechanics of basing retention etc on
> >>> log
> >>> > > append
> >>> > > > > > time
> >>> > > > > > > > when
> >>> > > > > > > > >> >> > that's
> >>> > > > > > > > >> >> > > > not
> >>> > > > > > > > >> >> > > > > in the log seem complicated
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > To that end here is a possible 4th option.
> >>> Let me
> >>> > > > know
> >>> > > > > > what
> >>> > > > > > > > you
> >>> > > > > > > > >> >> > think.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > The basic idea is that the message creation
> >>> time
> >>> > is
> >>> > > > > > closest
> >>> > > > > > > > to
> >>> > > > > > > > >> >> what
> >>> > > > > > > > >> >> > the
> >>> > > > > > > > >> >> > > > > user actually cares about but is dangerous
> >>> if set
> >>> > > > > wrong.
> >>> > > > > > So
> >>> > > > > > > > >> rather
> >>> > > > > > > > >> >> > than
> >>> > > > > > > > >> >> > > > > substitute another notion of time, let's
> try
> >>> to
> >>> > > > ensure
> >>> > > > > > the
> >>> > > > > > > > >> >> > correctness
> >>> > > > > > > > >> >> > > of
> >>> > > > > > > > >> >> > > > > message creation time by preventing
> >>> arbitrarily
> >>> > bad
> >>> > > > > > message
> >>> > > > > > > > >> >> creation
> >>> > > > > > > > >> >> > > > times.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > First, let's see if we can agree that log
> >>> append
> >>> > > time
> >>> > > > > is
> >>> > > > > > > not
> >>> > > > > > > > >> >> > something
> >>> > > > > > > > >> >> > > > > anyone really cares about but rather an
> >>> > > > implementation
> >>> > > > > > > > detail.
> >>> > > > > > > > >> The
> >>> > > > > > > > >> >> > > > > timestamp that matters to the user is when
> >>> the
> >>> > > > message
> >>> > > > > > > > occurred
> >>> > > > > > > > >> >> (the
> >>> > > > > > > > >> >> > > > > creation time). The log append time is
> >>> basically
> >>> > > just
> >>> > > > > an
> >>> > > > > > > > >> >> > approximation
> >>> > > > > > > > >> >> > > to
> >>> > > > > > > > >> >> > > > > this on the assumption that the message
> >>> creation
> >>> > > and
> >>> > > > > the
> >>> > > > > > > > message
> >>> > > > > > > > >> >> > > receive
> >>> > > > > > > > >> >> > > > on
> >>> > > > > > > > >> >> > > > > the server occur pretty close together and
> >>> the
> >>> > > reason
> >>> > > > > to
> >>> > > > > > > > prefer
> >>> > > > > > > > >> .
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > But as these values diverge the issue
> starts
> >>> to
> >>> > > > become
> >>> > > > > > > > apparent.
> >>> > > > > > > > >> >> Say
> >>> > > > > > > > >> >> > > you
> >>> > > > > > > > >> >> > > > > set the retention to one week and then
> mirror
> >>> > data
> >>> > > > > from a
> >>> > > > > > > > topic
> >>> > > > > > > > >> >> > > > containing
> >>> > > > > > > > >> >> > > > > two years of retention. Your intention is
> >>> clearly
> >>> > > to
> >>> > > > > keep
> >>> > > > > > > the
> >>> > > > > > > > >> last
> >>> > > > > > > > >> >> > > week,
> >>> > > > > > > > >> >> > > > > but because the mirroring is appending
> right
> >>> now
> >>> > > you
> >>> > > > > will
> >>> > > > > > > > keep
> >>> > > > > > > > >> two
> >>> > > > > > > > >> >> > > years.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > The reason we are liking log append time is
> >>> > because
> >>> > > > we
> >>> > > > > > are
> >>> > > > > > > > >> >> > > (justifiably)
> >>> > > > > > > > >> >> > > > > concerned that in certain situations the
> >>> creation
> >>> > > > time
> >>> > > > > > may
> >>> > > > > > > > not
> >>> > > > > > > > >> be
> >>> > > > > > > > >> >> > > > > trustworthy. This same problem exists on
> the
> >>> > > servers
> >>> > > > > but
> >>> > > > > > > > there
> >>> > > > > > > > >> are
> >>> > > > > > > > >> >> > > fewer
> >>> > > > > > > > >> >> > > > > servers and they just run the kafka code so
> >>> it is
> >>> > > > less
> >>> > > > > of
> >>> > > > > > > an
> >>> > > > > > > > >> >> issue.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > There are two possible ways to handle this:
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > >    1. Just tell people to add size based
> >>> > > retention. I
> >>> > > > > > think
> >>> > > > > > > > this
> >>> > > > > > > > >> >> is
> >>> > > > > > > > >> >> > not
> >>> > > > > > > > >> >> > > > >    entirely unreasonable, we're basically
> >>> saying
> >>> > we
> >>> > > > > > retain
> >>> > > > > > > > data
> >>> > > > > > > > >> >> based
> >>> > > > > > > > >> >> > > on
> >>> > > > > > > > >> >> > > > the
> >>> > > > > > > > >> >> > > > >    timestamp you give us in the data. If
> you
> >>> give
> >>> > > us
> >>> > > > > bad
> >>> > > > > > > > data we
> >>> > > > > > > > >> >> will
> >>> > > > > > > > >> >> > > > retain
> >>> > > > > > > > >> >> > > > >    it for a bad amount of time. If you want
> >>> to
> >>> > > ensure
> >>> > > > > we
> >>> > > > > > > > don't
> >>> > > > > > > > >> >> retain
> >>> > > > > > > > >> >> > > > "too
> >>> > > > > > > > >> >> > > > >    much" data, define "too much" by
> setting a
> >>> > > > > time-based
> >>> > > > > > > > >> retention
> >>> > > > > > > > >> >> > > > setting.
> >>> > > > > > > > >> >> > > > >    This is not entirely unreasonable but
> >>> kind of
> >>> > > > > suffers
> >>> > > > > > > > from a
> >>> > > > > > > > >> >> "one
> >>> > > > > > > > >> >> > > bad
> >>> > > > > > > > >> >> > > > >    apple" problem in a very large
> >>> environment.
> >>> > > > > > > > >> >> > > > >    2. Prevent bad timestamps. In general we
> >>> can't
> >>> > > > say a
> >>> > > > > > > > >> timestamp
> >>> > > > > > > > >> >> is
> >>> > > > > > > > >> >> > > bad.
> >>> > > > > > > > >> >> > > > >    However the definition we're implicitly
> >>> using
> >>> > is
> >>> > > > > that
> >>> > > > > > we
> >>> > > > > > > > >> think
> >>> > > > > > > > >> >> > there
> >>> > > > > > > > >> >> > > > are a
> >>> > > > > > > > >> >> > > > >    set of topics/clusters where the
> creation
> >>> > > > timestamp
> >>> > > > > > > should
> >>> > > > > > > > >> >> always
> >>> > > > > > > > >> >> > be
> >>> > > > > > > > >> >> > > > "very
> >>> > > > > > > > >> >> > > > >    close" to the log append timestamp. This
> >>> is
> >>> > true
> >>> > > > for
> >>> > > > > > > data
> >>> > > > > > > > >> >> sources
> >>> > > > > > > > >> >> > > > that have
> >>> > > > > > > > >> >> > > > >    no buffering capability (which at
> >>> LinkedIn is
> >>> > > very
> >>> > > > > > > common,
> >>> > > > > > > > >> but
> >>> > > > > > > > >> >> is
> >>> > > > > > > > >> >> > > > more rare
> >>> > > > > > > > >> >> > > > >    elsewhere). The solution in this case
> >>> would be
> >>> > > to
> >>> > > > > > allow
> >>> > > > > > > a
> >>> > > > > > > > >> >> setting
> >>> > > > > > > > >> >> > > > along the
> >>> > > > > > > > >> >> > > > >    lines of max.append.delay which checks
> the
> >>> > > > creation
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> >> > > against
> >>> > > > > > > > >> >> > > > the
> >>> > > > > > > > >> >> > > > >    server time to look for too large a
> >>> > divergence.
> >>> > > > The
> >>> > > > > > > > solution
> >>> > > > > > > > >> >> would
> >>> > > > > > > > >> >> > > > either
> >>> > > > > > > > >> >> > > > >    be to reject the message or to override
> it
> >>> > with
> >>> > > > the
> >>> > > > > > > server
> >>> > > > > > > > >> >> time.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > So in LI's environment you would configure
> >>> the
> >>> > > > clusters
> >>> > > > > > > used
> >>> > > > > > > > for
> >>> > > > > > > > >> >> > > direct,
> >>> > > > > > > > >> >> > > > > unbuffered, message production (e.g.
> >>> tracking and
> >>> > > > > metrics
> >>> > > > > > > > local)
> >>> > > > > > > > >> >> to
> >>> > > > > > > > >> >> > > > enforce
> >>> > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound
> (say
> >>> 10
> >>> > > > mins),
> >>> > > > > > and
> >>> > > > > > > > all
> >>> > > > > > > > >> >> other
> >>> > > > > > > > >> >> > > > > clusters would just inherent these.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > The downside of this approach is requiring
> >>> the
> >>> > > > special
> >>> > > > > > > > >> >> configuration.
> >>> > > > > > > > >> >> > > > > However I think in 99% of environments this
> >>> could
> >>> > > be
> >>> > > > > > > skipped
> >>> > > > > > > > >> >> > entirely,
> >>> > > > > > > > >> >> > > > it's
> >>> > > > > > > > >> >> > > > > only when the ratio of clients to servers
> >>> gets so
> >>> > > > > massive
> >>> > > > > > > > that
> >>> > > > > > > > >> you
> >>> > > > > > > > >> >> > need
> >>> > > > > > > > >> >> > > > to
> >>> > > > > > > > >> >> > > > > do this. The primary upside is that you
> have
> >>> a
> >>> > > single
> >>> > > > > > > > >> >> authoritative
> >>> > > > > > > > >> >> > > > notion
> >>> > > > > > > > >> >> > > > > of time which is closest to what a user
> would
> >>> > want
> >>> > > > and
> >>> > > > > is
> >>> > > > > > > > stored
> >>> > > > > > > > >> >> > > directly
> >>> > > > > > > > >> >> > > > > in the message.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > I'm also assuming there is a workable
> >>> approach
> >>> > for
> >>> > > > > > indexing
> >>> > > > > > > > >> >> > > non-monotonic
> >>> > > > > > > > >> >> > > > > timestamps, though I haven't actually
> worked
> >>> that
> >>> > > > out.
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > -Jay
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie
> Qin
> >>> > > > > > > > >> >> > <j...@linkedin.com.invalid
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > > > > wrote:
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > >> Bumping up this thread although most of
> the
> >>> > > > discussion
> >>> > > > > > > were
> >>> > > > > > > > on
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > > > >> discussion thread of KIP-31 :)
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> I just updated the KIP page to add
> detailed
> >>> > > solution
> >>> > > > > for
> >>> > > > > > > the
> >>> > > > > > > > >> >> option
> >>> > > > > > > > >> >> > > > >> (Option
> >>> > > > > > > > >> >> > > > >> 3) that does not expose the LogAppendTime
> to
> >>> > user.
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >>
> >>> > > > > > > > >>
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> The option has a minor change to the fetch
> >>> > request
> >>> > > > to
> >>> > > > > > > allow
> >>> > > > > > > > >> >> fetching
> >>> > > > > > > > >> >> > > > time
> >>> > > > > > > > >> >> > > > >> index entry as well. I kind of like this
> >>> > solution
> >>> > > > > > because
> >>> > > > > > > > its
> >>> > > > > > > > >> >> just
> >>> > > > > > > > >> >> > > doing
> >>> > > > > > > > >> >> > > > >> what we need without introducing other
> >>> things.
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> It will be great to see what are the
> >>> feedback. I
> >>> > > can
> >>> > > > > > > explain
> >>> > > > > > > > >> more
> >>> > > > > > > > >> >> > > during
> >>> > > > > > > > >> >> > > > >> tomorrow's KIP hangout.
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> Thanks,
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM, Jiangjie
> >>> Qin <
> >>> > > > > > > > >> j...@linkedin.com
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >> > > > wrote:
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >> > Hi Jay,
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback on
> >>> the
> >>> > > > > timestamp
> >>> > > > > > > > >> proposal
> >>> > > > > > > > >> >> > that
> >>> > > > > > > > >> >> > > > was
> >>> > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31.
> >>> Please see
> >>> > > the
> >>> > > > > > > replies
> >>> > > > > > > > >> >> inline.
> >>> > > > > > > > >> >> > > > >> > The main change I made compared with
> >>> previous
> >>> > > > > proposal
> >>> > > > > > > is
> >>> > > > > > > > to
> >>> > > > > > > > >> >> add
> >>> > > > > > > > >> >> > > both
> >>> > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the
> >>> message.
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay
> >>> Kreps <
> >>> > > > > > > > j...@confluent.io
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >> > > wrote:
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> > > Hey Beckett,
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP
> >>> just
> >>> > for
> >>> > > > > > > > simplicity of
> >>> > > > > > > > >> >> > > > >> discussion.
> >>> > > > > > > > >> >> > > > >> > You
> >>> > > > > > > > >> >> > > > >> > > can still implement them in one
> patch. I
> >>> > think
> >>> > > > > > > > otherwise it
> >>> > > > > > > > >> >> will
> >>> > > > > > > > >> >> > > be
> >>> > > > > > > > >> >> > > > >> hard
> >>> > > > > > > > >> >> > > > >> > to
> >>> > > > > > > > >> >> > > > >> > > discuss/vote on them since if you like
> >>> the
> >>> > > > offset
> >>> > > > > > > > proposal
> >>> > > > > > > > >> >> but
> >>> > > > > > > > >> >> > not
> >>> > > > > > > > >> >> > > > the
> >>> > > > > > > > >> >> > > > >> > time
> >>> > > > > > > > >> >> > > > >> > > proposal what do you do?
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > Introducing a second notion of time
> into
> >>> > Kafka
> >>> > > > is
> >>> > > > > a
> >>> > > > > > > > pretty
> >>> > > > > > > > >> >> > massive
> >>> > > > > > > > >> >> > > > >> > > philosophical change so it kind of
> >>> warrants
> >>> > > it's
> >>> > > > > own
> >>> > > > > > > > KIP I
> >>> > > > > > > > >> >> think
> >>> > > > > > > > >> >> > > it
> >>> > > > > > > > >> >> > > > >> > isn't
> >>> > > > > > > > >> >> > > > >> > > just "Change message format".
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > WRT time I think one thing to clarify
> >>> in the
> >>> > > > > > proposal
> >>> > > > > > > is
> >>> > > > > > > > >> how
> >>> > > > > > > > >> >> MM
> >>> > > > > > > > >> >> > > will
> >>> > > > > > > > >> >> > > > >> have
> >>> > > > > > > > >> >> > > > >> > > access to set the timestamp?
> Presumably
> >>> this
> >>> > > > will
> >>> > > > > > be a
> >>> > > > > > > > new
> >>> > > > > > > > >> >> field
> >>> > > > > > > > >> >> > > in
> >>> > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then any
> >>> user
> >>> > can
> >>> > > > set
> >>> > > > > > the
> >>> > > > > > > > >> >> > timestamp,
> >>> > > > > > > > >> >> > > > >> right?
> >>> > > > > > > > >> >> > > > >> > > I'm not sure you answered the
> questions
> >>> > around
> >>> > > > how
> >>> > > > > > > this
> >>> > > > > > > > >> will
> >>> > > > > > > > >> >> > work
> >>> > > > > > > > >> >> > > > for
> >>> > > > > > > > >> >> > > > >> MM
> >>> > > > > > > > >> >> > > > >> > > since when MM retains timestamps from
> >>> > multiple
> >>> > > > > > > > partitions
> >>> > > > > > > > >> >> they
> >>> > > > > > > > >> >> > > will
> >>> > > > > > > > >> >> > > > >> then
> >>> > > > > > > > >> >> > > > >> > be
> >>> > > > > > > > >> >> > > > >> > > out of order and in the past (so the
> >>> > > > > > > > >> >> max(lastAppendedTimestamp,
> >>> > > > > > > > >> >> > > > >> > > currentTimeMillis) override you
> proposed
> >>> > will
> >>> > > > not
> >>> > > > > > > work,
> >>> > > > > > > > >> >> right?).
> >>> > > > > > > > >> >> > > If
> >>> > > > > > > > >> >> > > > we
> >>> > > > > > > > >> >> > > > >> > > don't do this then when you set up
> >>> mirroring
> >>> > > the
> >>> > > > > > data
> >>> > > > > > > > will
> >>> > > > > > > > >> >> all
> >>> > > > > > > > >> >> > be
> >>> > > > > > > > >> >> > > > new
> >>> > > > > > > > >> >> > > > >> and
> >>> > > > > > > > >> >> > > > >> > > you have the same retention problem
> you
> >>> > > > described.
> >>> > > > > > > > Maybe I
> >>> > > > > > > > >> >> > missed
> >>> > > > > > > > >> >> > > > >> > > something...?
> >>> > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the
> timestamp
> >>> of
> >>> > the
> >>> > > > > > message
> >>> > > > > > > > that
> >>> > > > > > > > >> >> last
> >>> > > > > > > > >> >> > > > >> > appended to the log.
> >>> > > > > > > > >> >> > > > >> > If a broker is a leader, since it will
> >>> assign
> >>> > > the
> >>> > > > > > > > timestamp
> >>> > > > > > > > >> by
> >>> > > > > > > > >> >> > > itself,
> >>> > > > > > > > >> >> > > > >> the
> >>> > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local
> >>> clock
> >>> > > when
> >>> > > > > > append
> >>> > > > > > > > the
> >>> > > > > > > > >> >> last
> >>> > > > > > > > >> >> > > > >> message.
> >>> > > > > > > > >> >> > > > >> > So if there is no leader migration,
> >>> > > > > > > > >> max(lastAppendedTimestamp,
> >>> > > > > > > > >> >> > > > >> > currentTimeMillis) = currentTimeMillis.
> >>> > > > > > > > >> >> > > > >> > If a broker is a follower, because it
> will
> >>> > keep
> >>> > > > the
> >>> > > > > > > > leader's
> >>> > > > > > > > >> >> > > timestamp
> >>> > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would be
> >>> the
> >>> > > > > leader's
> >>> > > > > > > > clock
> >>> > > > > > > > >> >> when
> >>> > > > > > > > >> >> > it
> >>> > > > > > > > >> >> > > > >> appends
> >>> > > > > > > > >> >> > > > >> > that message message. It keeps track of
> >>> the
> >>> > > > > > > > lastAppendedTime
> >>> > > > > > > > >> >> only
> >>> > > > > > > > >> >> > in
> >>> > > > > > > > >> >> > > > >> case
> >>> > > > > > > > >> >> > > > >> > it becomes leader later on. At that
> >>> point, it
> >>> > is
> >>> > > > > > > possible
> >>> > > > > > > > >> that
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > > > >> > timestamp of the last appended message
> was
> >>> > > stamped
> >>> > > > > by
> >>> > > > > > > old
> >>> > > > > > > > >> >> leader,
> >>> > > > > > > > >> >> > > but
> >>> > > > > > > > >> >> > > > >> the
> >>> > > > > > > > >> >> > > > >> > new leader's currentTimeMillis <
> >>> > > lastAppendedTime.
> >>> > > > > If
> >>> > > > > > a
> >>> > > > > > > > new
> >>> > > > > > > > >> >> > message
> >>> > > > > > > > >> >> > > > >> comes,
> >>> > > > > > > > >> >> > > > >> > instead of stamp it with new leader's
> >>> > > > > > currentTimeMillis,
> >>> > > > > > > > we
> >>> > > > > > > > >> >> have
> >>> > > > > > > > >> >> > to
> >>> > > > > > > > >> >> > > > >> stamp
> >>> > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the
> >>> timestamp
> >>> > in
> >>> > > > the
> >>> > > > > > log
> >>> > > > > > > > >> going
> >>> > > > > > > > >> >> > > > backward.
> >>> > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp,
> >>> > > currentTimeMillis)
> >>> > > > is
> >>> > > > > > > > purely
> >>> > > > > > > > >> >> based
> >>> > > > > > > > >> >> > on
> >>> > > > > > > > >> >> > > > the
> >>> > > > > > > > >> >> > > > >> > broker side clock. If MM produces
> message
> >>> with
> >>> > > > > > different
> >>> > > > > > > > >> >> > > LogAppendTime
> >>> > > > > > > > >> >> > > > >> in
> >>> > > > > > > > >> >> > > > >> > source clusters to the same target
> >>> cluster,
> >>> > the
> >>> > > > > > > > LogAppendTime
> >>> > > > > > > > >> >> will
> >>> > > > > > > > >> >> > > be
> >>> > > > > > > > >> >> > > > >> > ignored  re-stamped by target cluster.
> >>> > > > > > > > >> >> > > > >> > I added a use case example for mirror
> >>> maker in
> >>> > > > > KIP-32.
> >>> > > > > > > > Also
> >>> > > > > > > > >> >> there
> >>> > > > > > > > >> >> > > is a
> >>> > > > > > > > >> >> > > > >> > corner case discussion about when we
> need
> >>> the
> >>> > > > > > > > >> >> > max(lastAppendedTime,
> >>> > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you
> take a
> >>> > look
> >>> > > > and
> >>> > > > > > see
> >>> > > > > > > if
> >>> > > > > > > > >> that
> >>> > > > > > > > >> >> > > > answers
> >>> > > > > > > > >> >> > > > >> > your question?
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > My main motivation is that given that
> >>> both
> >>> > > Samza
> >>> > > > > and
> >>> > > > > > > > Kafka
> >>> > > > > > > > >> >> > streams
> >>> > > > > > > > >> >> > > > are
> >>> > > > > > > > >> >> > > > >> > > doing work that implies a mandatory
> >>> > > > client-defined
> >>> > > > > > > > notion
> >>> > > > > > > > >> of
> >>> > > > > > > > >> >> > > time, I
> >>> > > > > > > > >> >> > > > >> > really
> >>> > > > > > > > >> >> > > > >> > > think introducing a different
> mandatory
> >>> > notion
> >>> > > > of
> >>> > > > > > time
> >>> > > > > > > > in
> >>> > > > > > > > >> >> Kafka
> >>> > > > > > > > >> >> > is
> >>> > > > > > > > >> >> > > > >> going
> >>> > > > > > > > >> >> > > > >> > to
> >>> > > > > > > > >> >> > > > >> > > be quite odd. We should think hard
> >>> about how
> >>> > > > > > > > client-defined
> >>> > > > > > > > >> >> time
> >>> > > > > > > > >> >> > > > could
> >>> > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but I'm
> >>> also
> >>> > not
> >>> > > > > sure
> >>> > > > > > > > that it
> >>> > > > > > > > >> >> > can't.
> >>> > > > > > > > >> >> > > > >> Having
> >>> > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about
> >>> this
> >>> > with
> >>> > > > > > > > Yi/Kartik on
> >>> > > > > > > > >> >> the
> >>> > > > > > > > >> >> > > > Samza
> >>> > > > > > > > >> >> > > > >> > side?
> >>> > > > > > > > >> >> > > > >> > I talked with Kartik and realized that
> it
> >>> > would
> >>> > > be
> >>> > > > > > > useful
> >>> > > > > > > > to
> >>> > > > > > > > >> >> have
> >>> > > > > > > > >> >> > a
> >>> > > > > > > > >> >> > > > >> client
> >>> > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like
> >>> stream
> >>> > > > > > > processing.
> >>> > > > > > > > >> >> > > > >> > I was trying to figure out if we can
> >>> simply
> >>> > use
> >>> > > > > client
> >>> > > > > > > > >> >> timestamp
> >>> > > > > > > > >> >> > > > without
> >>> > > > > > > > >> >> > > > >> > introducing the server time. There are
> >>> some
> >>> > > > > discussion
> >>> > > > > > > in
> >>> > > > > > > > the
> >>> > > > > > > > >> >> KIP.
> >>> > > > > > > > >> >> > > > >> > The key problem we want to solve here is
> >>> > > > > > > > >> >> > > > >> > 1. We want log retention and rolling to
> >>> depend
> >>> > > on
> >>> > > > > > server
> >>> > > > > > > > >> clock.
> >>> > > > > > > > >> >> > > > >> > 2. We want to make sure the
> log-assiciated
> >>> > > > timestamp
> >>> > > > > > to
> >>> > > > > > > be
> >>> > > > > > > > >> >> > retained
> >>> > > > > > > > >> >> > > > when
> >>> > > > > > > > >> >> > > > >> > replicas moves.
> >>> > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in some
> >>> way
> >>> > that
> >>> > > > can
> >>> > > > > > > allow
> >>> > > > > > > > >> >> > searching
> >>> > > > > > > > >> >> > > > by
> >>> > > > > > > > >> >> > > > >> > timestamp.
> >>> > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass
> the
> >>> > > > > > > log-associated
> >>> > > > > > > > >> >> > timestamp
> >>> > > > > > > > >> >> > > > >> > through replication, that means we need
> to
> >>> > have
> >>> > > a
> >>> > > > > > > > different
> >>> > > > > > > > >> >> > protocol
> >>> > > > > > > > >> >> > > > for
> >>> > > > > > > > >> >> > > > >> > replica fetching to pass log-associated
> >>> > > timestamp.
> >>> > > > > It
> >>> > > > > > is
> >>> > > > > > > > >> >> actually
> >>> > > > > > > > >> >> > > > >> > complicated and there could be a lot of
> >>> corner
> >>> > > > cases
> >>> > > > > > to
> >>> > > > > > > > >> handle.
> >>> > > > > > > > >> >> > e.g.
> >>> > > > > > > > >> >> > > > >> what
> >>> > > > > > > > >> >> > > > >> > if an old leader started to fetch from
> >>> the new
> >>> > > > > leader,
> >>> > > > > > > > should
> >>> > > > > > > > >> >> it
> >>> > > > > > > > >> >> > > also
> >>> > > > > > > > >> >> > > > >> > update all of its old log segment
> >>> timestamp?
> >>> > > > > > > > >> >> > > > >> > I think actually client side timestamp
> >>> would
> >>> > be
> >>> > > > > better
> >>> > > > > > > > for 3
> >>> > > > > > > > >> >> if we
> >>> > > > > > > > >> >> > > can
> >>> > > > > > > > >> >> > > > >> > find a way to make it work.
> >>> > > > > > > > >> >> > > > >> > So far I am not able to convince myself
> >>> that
> >>> > > only
> >>> > > > > > having
> >>> > > > > > > > >> client
> >>> > > > > > > > >> >> > side
> >>> > > > > > > > >> >> > > > >> > timestamp would work mainly because 1
> and
> >>> 2.
> >>> > > There
> >>> > > > > > are a
> >>> > > > > > > > few
> >>> > > > > > > > >> >> > > > situations
> >>> > > > > > > > >> >> > > > >> I
> >>> > > > > > > > >> >> > > > >> > mentioned in the KIP.
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > When you are saying it won't work you
> >>> are
> >>> > > > assuming
> >>> > > > > > > some
> >>> > > > > > > > >> >> > particular
> >>> > > > > > > > >> >> > > > >> > > implementation? Maybe that the index
> is
> >>> a
> >>> > > > > > > monotonically
> >>> > > > > > > > >> >> > increasing
> >>> > > > > > > > >> >> > > > >> set of
> >>> > > > > > > > >> >> > > > >> > > pointers to the least record with a
> >>> > timestamp
> >>> > > > > larger
> >>> > > > > > > > than
> >>> > > > > > > > >> the
> >>> > > > > > > > >> >> > > index
> >>> > > > > > > > >> >> > > > >> time?
> >>> > > > > > > > >> >> > > > >> > > In other words a search for time X
> >>> gives the
> >>> > > > > largest
> >>> > > > > > > > offset
> >>> > > > > > > > >> >> at
> >>> > > > > > > > >> >> > > which
> >>> > > > > > > > >> >> > > > >> all
> >>> > > > > > > > >> >> > > > >> > > records are <= X?
> >>> > > > > > > > >> >> > > > >> > It is a promising idea. We probably can
> >>> have
> >>> > an
> >>> > > > > > > in-memory
> >>> > > > > > > > >> index
> >>> > > > > > > > >> >> > like
> >>> > > > > > > > >> >> > > > >> that,
> >>> > > > > > > > >> >> > > > >> > but might be complicated to have a file
> on
> >>> > disk
> >>> > > > like
> >>> > > > > > > that.
> >>> > > > > > > > >> >> Imagine
> >>> > > > > > > > >> >> > > > there
> >>> > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see
> >>> message Y
> >>> > > > created
> >>> > > > > > at
> >>> > > > > > > T1
> >>> > > > > > > > >> and
> >>> > > > > > > > >> >> > > created
> >>> > > > > > > > >> >> > > > >> > index like [T1->Y], then we see message
> >>> > created
> >>> > > at
> >>> > > > > T1,
> >>> > > > > > > > >> >> supposedly
> >>> > > > > > > > >> >> > we
> >>> > > > > > > > >> >> > > > >> should
> >>> > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y], it
> is
> >>> > easy
> >>> > > to
> >>> > > > > do
> >>> > > > > > in
> >>> > > > > > > > >> >> memory,
> >>> > > > > > > > >> >> > but
> >>> > > > > > > > >> >> > > > we
> >>> > > > > > > > >> >> > > > >> > might have to rewrite the index file
> >>> > completely.
> >>> > > > > Maybe
> >>> > > > > > > we
> >>> > > > > > > > can
> >>> > > > > > > > >> >> have
> >>> > > > > > > > >> >> > > the
> >>> > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and
> only
> >>> > update
> >>> > > > the
> >>> > > > > > > first
> >>> > > > > > > > >> >> pointer
> >>> > > > > > > > >> >> > > for
> >>> > > > > > > > >> >> > > > >> any
> >>> > > > > > > > >> >> > > > >> > out of range timestamp, so the index
> will
> >>> be
> >>> > > > [0->X,
> >>> > > > > > > > T1->Y].
> >>> > > > > > > > >> >> Also,
> >>> > > > > > > > >> >> > > the
> >>> > > > > > > > >> >> > > > >> range
> >>> > > > > > > > >> >> > > > >> > of timestamps in the log segments can
> >>> overlap
> >>> > > with
> >>> > > > > > each
> >>> > > > > > > > >> other.
> >>> > > > > > > > >> >> > That
> >>> > > > > > > > >> >> > > > >> means
> >>> > > > > > > > >> >> > > > >> > we either need to keep a cross segments
> >>> index
> >>> > > file
> >>> > > > > or
> >>> > > > > > we
> >>> > > > > > > > need
> >>> > > > > > > > >> >> to
> >>> > > > > > > > >> >> > > check
> >>> > > > > > > > >> >> > > > >> all
> >>> > > > > > > > >> >> > > > >> > the index file for each log segment.
> >>> > > > > > > > >> >> > > > >> > I separated out the time based log index
> >>> to
> >>> > > KIP-33
> >>> > > > > > > > because it
> >>> > > > > > > > >> >> can
> >>> > > > > > > > >> >> > be
> >>> > > > > > > > >> >> > > > an
> >>> > > > > > > > >> >> > > > >> > independent follow up feature as Neha
> >>> > > suggested. I
> >>> > > > > > will
> >>> > > > > > > > try
> >>> > > > > > > > >> to
> >>> > > > > > > > >> >> > make
> >>> > > > > > > > >> >> > > > the
> >>> > > > > > > > >> >> > > > >> > time based index work with client side
> >>> > > timestamp.
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > For retention, I agree with the
> problem
> >>> you
> >>> > > > point
> >>> > > > > > out,
> >>> > > > > > > > but
> >>> > > > > > > > >> I
> >>> > > > > > > > >> >> > think
> >>> > > > > > > > >> >> > > > >> what
> >>> > > > > > > > >> >> > > > >> > you
> >>> > > > > > > > >> >> > > > >> > > are saying in that case is that you
> >>> want a
> >>> > > size
> >>> > > > > > limit
> >>> > > > > > > > too.
> >>> > > > > > > > >> If
> >>> > > > > > > > >> >> > you
> >>> > > > > > > > >> >> > > > use
> >>> > > > > > > > >> >> > > > >> > > system time you actually hit the same
> >>> > problem:
> >>> > > > say
> >>> > > > > > you
> >>> > > > > > > > do a
> >>> > > > > > > > >> >> full
> >>> > > > > > > > >> >> > > > dump
> >>> > > > > > > > >> >> > > > >> of
> >>> > > > > > > > >> >> > > > >> > a
> >>> > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days
> >>> retention,
> >>> > > > your
> >>> > > > > > > > retention
> >>> > > > > > > > >> >> will
> >>> > > > > > > > >> >> > > > >> actually
> >>> > > > > > > > >> >> > > > >> > > not get enforced for the first 7 days
> >>> > because
> >>> > > > the
> >>> > > > > > data
> >>> > > > > > > > is
> >>> > > > > > > > >> >> "new
> >>> > > > > > > > >> >> > to
> >>> > > > > > > > >> >> > > > >> Kafka".
> >>> > > > > > > > >> >> > > > >> > I kind of think the size limit here is
> >>> > > orthogonal.
> >>> > > > > It
> >>> > > > > > > is a
> >>> > > > > > > > >> >> valid
> >>> > > > > > > > >> >> > use
> >>> > > > > > > > >> >> > > > >> case
> >>> > > > > > > > >> >> > > > >> > where people only want to use time based
> >>> > > retention
> >>> > > > > > only.
> >>> > > > > > > > In
> >>> > > > > > > > >> >> your
> >>> > > > > > > > >> >> > > > >> example,
> >>> > > > > > > > >> >> > > > >> > depending on client timestamp might
> break
> >>> the
> >>> > > > > > > > functionality -
> >>> > > > > > > > >> >> say
> >>> > > > > > > > >> >> > it
> >>> > > > > > > > >> >> > > > is
> >>> > > > > > > > >> >> > > > >> a
> >>> > > > > > > > >> >> > > > >> > bootstrap case people actually need to
> >>> read
> >>> > all
> >>> > > > the
> >>> > > > > > > data.
> >>> > > > > > > > If
> >>> > > > > > > > >> we
> >>> > > > > > > > >> >> > > depend
> >>> > > > > > > > >> >> > > > >> on
> >>> > > > > > > > >> >> > > > >> > the client timestamp, the data might be
> >>> > deleted
> >>> > > > > > > instantly
> >>> > > > > > > > >> when
> >>> > > > > > > > >> >> > they
> >>> > > > > > > > >> >> > > > >> come to
> >>> > > > > > > > >> >> > > > >> > the broker. It might be too demanding to
> >>> > expect
> >>> > > > the
> >>> > > > > > > > broker to
> >>> > > > > > > > >> >> > > > understand
> >>> > > > > > > > >> >> > > > >> > what people actually want to do with the
> >>> data
> >>> > > > coming
> >>> > > > > > in.
> >>> > > > > > > > So
> >>> > > > > > > > >> the
> >>> > > > > > > > >> >> > > > >> guarantee
> >>> > > > > > > > >> >> > > > >> > of using server side timestamp is that
> >>> "after
> >>> > > > > appended
> >>> > > > > > > to
> >>> > > > > > > > the
> >>> > > > > > > > >> >> log,
> >>> > > > > > > > >> >> > > all
> >>> > > > > > > > >> >> > > > >> > messages will be available on broker for
> >>> > > retention
> >>> > > > > > > time",
> >>> > > > > > > > >> >> which is
> >>> > > > > > > > >> >> > > not
> >>> > > > > > > > >> >> > > > >> > changeable by clients.
> >>> > > > > > > > >> >> > > > >> > >
> >>> > > > > > > > >> >> > > > >> > > -Jay
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM,
> Jiangjie
> >>> > Qin <
> >>> > > > > > > > >> >> j...@linkedin.com
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > > >> wrote:
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >> >> Hi folks,
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >> This proposal was previously in KIP-31
> >>> and we
> >>> > > > > > separated
> >>> > > > > > > > it
> >>> > > > > > > > >> to
> >>> > > > > > > > >> >> > > KIP-32
> >>> > > > > > > > >> >> > > > >> per
> >>> > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion.
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >> The proposal is to add the following
> two
> >>> > > > timestamps
> >>> > > > > > to
> >>> > > > > > > > Kafka
> >>> > > > > > > > >> >> > > message.
> >>> > > > > > > > >> >> > > > >> >> - CreateTime
> >>> > > > > > > > >> >> > > > >> >> - LogAppendTime
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the
> >>> producer
> >>> > and
> >>> > > > will
> >>> > > > > > > > change
> >>> > > > > > > > >> >> after
> >>> > > > > > > > >> >> > > > that.
> >>> > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by broker
> >>> for
> >>> > > > purpose
> >>> > > > > > > such
> >>> > > > > > > > as
> >>> > > > > > > > >> >> > enforce
> >>> > > > > > > > >> >> > > > log
> >>> > > > > > > > >> >> > > > >> >> retention and log rolling.
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >> Thanks,
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >>
> >>> > > > > > > > >> >> > > > >> >
> >>> > > > > > > > >> >> > > > >>
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > > >
> >>> > > > > > > > >> >> > > >
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> > > --
> >>> > > > > > > > >> >> > > -- Guozhang
> >>> > > > > > > > >> >> > >
> >>> > > > > > > > >> >> >
> >>> > > > > > > > >> >>
> >>> > > > > > > > >> >
> >>> > > > > > > > >> >
> >>> > > > > > > > >>
> >>> > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >
>



-- 
-- Guozhang

Reply via email to