Hi Guozhang,

That makes sense. I will update the KIP wiki and bump up the voting thread
to let people know about this change.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 26, 2016 at 10:55 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> One motivation of my proposal is actually to avoid any clients trying to
> read the timestamp type from the topic metadata response and behave
> differently since:
>
> 1) topic metadata response is not always in-sync with the source-of-truth
> (ZK), hence when the clients realized that the config has changed it may
> already be too late (i.e. for consumer the records with the wrong timestamp
> could already be returned to user).
>
> 2) the client logic could be a bit simpler, and this will benefit non-Java
> development a lot. Also we can avoid adding this into the topic metadata
> response.
>
> Guozhang
>
> On Tue, Jan 26, 2016 at 3:20 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > My hesitation for the changed protocol is that I think If we will have
> > topic configuration returned in the topic metadata, the current protocol
> > makes more sense. Because the timestamp type is a topic level setting so
> we
> > don't need to put it into each message. That is assuming the timestamp
> type
> > change on a topic rarely happens and if it is ever needed, the existing
> > data should be wiped out.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Jan 26, 2016 at 2:07 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Bump up this thread per discussion on the KIP hangout.
> > >
> > > During the implementation of the KIP, Guozhang raised another proposal
> on
> > > how to indicate the message timestamp type used by messages. So we want
> > to
> > > see people's opinion on this proposal.
> > >
> > > The difference between current and the new proposal only differs on
> > > messages that are a) compressed, and b) using LogAppendTime
> > >
> > > For compressed messages using LogAppendTime, the timestamps in the
> > current
> > > proposal is as below:
> > > 1. When a producer produces the messages, it tries to set timestamp to
> -1
> > > for inner messages if it knows LogAppendTime is used.
> > > 2. When a broker receives the messages, it will overwrite the timestamp
> > of
> > > inner message to -1 if needed and write server time to the wrapper
> > message.
> > > Broker will do re-compression if inner message timestamp is
> overwritten.
> > > 3. When a consumer receives the messages, it will see the inner message
> > > timestamp is -1 so the wrapper message timestamp is used.
> > >
> > > Implementation wise, this proposal requires the producer to set
> timestamp
> > > for inner messages correctly to avoid broker side re-compression. To do
> > > that, the short term solution is to let producer infer the timestamp
> type
> > > returned by broker in ProduceResponse and set correct timestamp
> > afterwards.
> > > This means the first few batches will still need re-compression on the
> > > broker. The long term solution is to have producer get topic
> > configuration
> > > during metadata update.
> > >
> > >
> > > The proposed modification is:
> > > 1. When a producer produces the messages, it always using create time.
> > > 2. When a broker receives the messages, it ignores the inner messages
> > > timestamp, but simply set a wrapper message timestamp type attribute
> bit
> > to
> > > 1 and set the timestamp of the wrapper message to server time. (The
> > broker
> > > will also set the timesatmp type attribute bit accordingly for
> > > non-compressed messages using LogAppendTime).
> > > 3. When a consumer receives the messages, it checks timestamp type
> > > attribute bit of wrapper message. If it is set to 1, the inner
> message's
> > > timestamp will be ignored and the wrapper message's timestamp will be
> > used.
> > >
> > > This approach uses an extra attribute bit. The good thing of the
> modified
> > > protocol is consumers will be able to know the timestamp type. And
> > > re-compression on broker side is completely avoided no matter what
> value
> > is
> > > sent by the producer. In this approach the inner messages will have
> wrong
> > > timestamps.
> > >
> > > We want to see if people have concerns over the modified approach.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Dec 15, 2015 at 11:45 AM, Becket Qin <becket....@gmail.com>
> > wrote:
> > >
> > >> Jun,
> > >>
> > >> 1. I agree it would be nice to have the timestamps used in a unified
> > way.
> > >> My concern is that if we let server change timestamp of the inner
> > message
> > >> for LogAppendTime, that will enforce the user who are using
> > LogAppendTime
> > >> to always pay the recompression penalty. So using LogAppendTime makes
> > >> KIP-31 in vain.
> > >>
> > >> 4. If there are no entries in the log segment, we can read from the
> time
> > >> index before the previous log segment. If there is no previous entry
> > >> avaliable after we search until the earliest log segment, that means
> all
> > >> the previous log segment with a valid time index entry has been
> > deleted. In
> > >> that case supposedly there should be only one log segment left - the
> > active
> > >> log segment, we can simply set the latest timestamp to 0.
> > >>
> > >> Guozhang,
> > >>
> > >> Sorry for the confusion. by "the timestamp of the latest message" I
> > >> actually meant "the timestamp of the message with largest timestamp".
> > So in
> > >> your example the "latest message" is 5.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Tue, Dec 15, 2015 at 10:02 AM, Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>
> > >>> Jun, Jiangjie,
> > >>>
> > >>> I am confused about 3) here, if we use "the timestamp of the latest
> > >>> message"
> > >>> then doesn't this mean we will roll the log whenever a message
> delayed
> > by
> > >>> rolling time is received as well? Just to clarify, my understanding
> of
> > >>> "the
> > >>> timestamp of the latest message", for example in the following log,
> is
> > 1,
> > >>> not 5:
> > >>>
> > >>> 2, 3, 4, 5, 1
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Mon, Dec 14, 2015 at 10:05 PM, Jun Rao <j...@confluent.io> wrote:
> > >>>
> > >>> > 1. Hmm, it's more intuitive if the consumer sees the same timestamp
> > >>> whether
> > >>> > the messages are compressed or not. When
> > >>> > message.timestamp.type=LogAppendTime,
> > >>> > we will need to set timestamp in each message if messages are not
> > >>> > compressed, so that the follower can get the same timestamp. So, it
> > >>> seems
> > >>> > that we should do the same thing for inner messages when messages
> are
> > >>> > compressed.
> > >>> >
> > >>> > 4. I thought on startup, we restore the timestamp of the latest
> > >>> message by
> > >>> > reading from the time index of the last log segment. So, what
> happens
> > >>> if
> > >>> > there are no index entries?
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Jun
> > >>> >
> > >>> > On Mon, Dec 14, 2015 at 6:28 PM, Becket Qin <becket....@gmail.com>
> > >>> wrote:
> > >>> >
> > >>> > > Thanks for the explanation, Jun.
> > >>> > >
> > >>> > > 1. That makes sense. So maybe we can do the following:
> > >>> > > (a) Set the timestamp in the compressed message to latest
> timestamp
> > >>> of
> > >>> > all
> > >>> > > its inner messages. This works for both LogAppendTime and
> > CreateTime.
> > >>> > > (b) If message.timestamp.type=LogAppendTime, the broker will
> > >>> overwrite
> > >>> > all
> > >>> > > the inner message timestamp to -1 if they are not set to -1. This
> > is
> > >>> > mainly
> > >>> > > for topics that are using LogAppendTime. Hopefully the producer
> > will
> > >>> set
> > >>> > > the timestamp to -1 in the ProducerRecord to avoid server side
> > >>> > > recompression.
> > >>> > >
> > >>> > > 3. I see. That works. So the semantic of log rolling becomes
> "roll
> > >>> out
> > >>> > the
> > >>> > > log segment if it has been inactive since the latest message has
> > >>> > arrived."
> > >>> > >
> > >>> > > 4. Yes. If the largest timestamp is in previous log segment. The
> > time
> > >>> > index
> > >>> > > for the current log segment does not have a valid offset in
> current
> > >>> log
> > >>> > > segment to point to. Maybe in that case we should build an empty
> > log
> > >>> > index.
> > >>> > >
> > >>> > > Thanks,
> > >>> > >
> > >>> > > Jiangjie (Becket) Qin
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Mon, Dec 14, 2015 at 5:51 PM, Jun Rao <j...@confluent.io>
> wrote:
> > >>> > >
> > >>> > > > 1. I was thinking more about saving the decompression overhead
> in
> > >>> the
> > >>> > > > follower. Currently, the follower doesn't decompress the
> > messages.
> > >>> To
> > >>> > > keep
> > >>> > > > it that way, the outer message needs to include the timestamp
> of
> > >>> the
> > >>> > > latest
> > >>> > > > inner message to build the time index in the follower. The
> > simplest
> > >>> > thing
> > >>> > > > to do is to change the timestamp in the inner messages if
> > >>> necessary, in
> > >>> > > > which case there will be the recompression overhead. However,
> in
> > >>> the
> > >>> > case
> > >>> > > > when the timestamp of the inner messages don't have to be
> changed
> > >>> > > > (hopefully more common), there won't be the recompression
> > >>> overhead. In
> > >>> > > > either case, we always set the timestamp in the outer message
> to
> > >>> be the
> > >>> > > > timestamp of the latest inner message, in the leader.
> > >>> > > >
> > >>> > > > 3. Basically, in each log segment, we keep track of the
> timestamp
> > >>> of
> > >>> > the
> > >>> > > > latest message. If current time - timestamp of latest message >
> > log
> > >>> > > rolling
> > >>> > > > interval, we roll a new log segment. So, if messages with later
> > >>> > > timestamps
> > >>> > > > keep getting added, we only roll new log segments based on
> size.
> > >>> On the
> > >>> > > > other hand, if no new messages are added to a log, we can
> force a
> > >>> log
> > >>> > > roll
> > >>> > > > based on time, which addresses the issue in (b).
> > >>> > > >
> > >>> > > > 4. Hmm, the index is per segment and should only point to
> > >>> positions in
> > >>> > > the
> > >>> > > > corresponding .log file, not previous ones, right?
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > >
> > >>> > > > Jun
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > On Mon, Dec 14, 2015 at 3:10 PM, Becket Qin <
> > becket....@gmail.com>
> > >>> > > wrote:
> > >>> > > >
> > >>> > > > > Hi Jun,
> > >>> > > > >
> > >>> > > > > Thanks a lot for the comments. Please see inline replies.
> > >>> > > > >
> > >>> > > > > Thanks,
> > >>> > > > >
> > >>> > > > > Jiangjie (Becket) Qin
> > >>> > > > >
> > >>> > > > > On Mon, Dec 14, 2015 at 10:19 AM, Jun Rao <j...@confluent.io>
> > >>> wrote:
> > >>> > > > >
> > >>> > > > > > Hi, Becket,
> > >>> > > > > >
> > >>> > > > > > Thanks for the proposal. Looks good overall. A few comments
> > >>> below.
> > >>> > > > > >
> > >>> > > > > > 1. KIP-32 didn't say what timestamp should be set in a
> > >>> compressed
> > >>> > > > > message.
> > >>> > > > > > We probably should set it to the timestamp of the latest
> > >>> messages
> > >>> > > > > included
> > >>> > > > > > in the compressed one. This way, during indexing, we don't
> > >>> have to
> > >>> > > > > > decompress the message.
> > >>> > > > > >
> > >>> > > > > That is a good point.
> > >>> > > > > In normal cases, broker needs to decompress the message for
> > >>> > > verification
> > >>> > > > > purpose anyway. So building time index does not add
> additional
> > >>> > > > > decompression.
> > >>> > > > > During time index recovery, however, having a timestamp in
> > >>> compressed
> > >>> > > > > message might save the decompression.
> > >>> > > > >
> > >>> > > > > Another thing I am thinking is we should make sure KIP-32
> works
> > >>> well
> > >>> > > with
> > >>> > > > > KIP-31. i.e. we don't want to do recompression in order to
> add
> > >>> > > timestamp
> > >>> > > > to
> > >>> > > > > messages.
> > >>> > > > > Take the approach in my last email, the timestamp in the
> > messages
> > >>> > will
> > >>> > > > > either all be overwritten by server if
> > >>> > > > > message.timestamp.type=LogAppendTime, or they will not be
> > >>> overwritten
> > >>> > > if
> > >>> > > > > message.timestamp.type=CreateTime.
> > >>> > > > >
> > >>> > > > > Maybe we can use the timestamp in compressed messages in the
> > >>> > following
> > >>> > > > way:
> > >>> > > > > If message.timestamp.type=LogAppendTime, we have to overwrite
> > >>> > > timestamps
> > >>> > > > > for all the messages. We can simply write the timestamp in
> the
> > >>> > > compressed
> > >>> > > > > message to avoid recompression.
> > >>> > > > > If message.timestamp.type=CreateTime, we do not need to
> > >>> overwrite the
> > >>> > > > > timestamps. We either reject the entire compressed message or
> > We
> > >>> just
> > >>> > > > leave
> > >>> > > > > the compressed message timestamp to be -1.
> > >>> > > > >
> > >>> > > > > So the semantic of the timestamp field in compressed message
> > >>> field
> > >>> > > > becomes:
> > >>> > > > > if it is greater than 0, that means LogAppendTime is used,
> the
> > >>> > > timestamp
> > >>> > > > of
> > >>> > > > > the inner messages is the compressed message LogAppendTime.
> If
> > >>> it is
> > >>> > > -1,
> > >>> > > > > that means the CreateTime is used, the timestamp is in each
> > >>> > individual
> > >>> > > > > inner message.
> > >>> > > > >
> > >>> > > > > This sacrifice the speed of recovery but seems worthy because
> > we
> > >>> > avoid
> > >>> > > > > recompression.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > > 2. In KIP-33, should we make the time-based index interval
> > >>> > > > configurable?
> > >>> > > > > > Perhaps we can default it 60 secs, but allow users to
> > >>> configure it
> > >>> > to
> > >>> > > > > > smaller values if they want more precision.
> > >>> > > > > >
> > >>> > > > > Yes, we can do that.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > > 3. In KIP-33, I am not sure if log rolling should be based
> on
> > >>> the
> > >>> > > > > earliest
> > >>> > > > > > message. This would mean that we will need to roll a log
> > >>> segment
> > >>> > > every
> > >>> > > > > time
> > >>> > > > > > we get a message delayed by the log rolling time interval.
> > >>> Also, on
> > >>> > > > > broker
> > >>> > > > > > startup, we can get the timestamp of the latest message in
> a
> > >>> log
> > >>> > > > segment
> > >>> > > > > > pretty efficiently by just looking at the last time index
> > >>> entry.
> > >>> > But
> > >>> > > > > > getting the timestamp of the earliest timestamp requires a
> > full
> > >>> > scan
> > >>> > > of
> > >>> > > > > all
> > >>> > > > > > log segments, which can be expensive. Previously, there
> were
> > >>> two
> > >>> > use
> > >>> > > > > cases
> > >>> > > > > > of time-based rolling: (a) more accurate time-based
> indexing
> > >>> and
> > >>> > (b)
> > >>> > > > > > retaining data by time (since the active segment is never
> > >>> deleted).
> > >>> > > (a)
> > >>> > > > > is
> > >>> > > > > > already solved with a time-based index. For (b), if the
> > >>> retention
> > >>> > is
> > >>> > > > > based
> > >>> > > > > > on the timestamp of the latest message in a log segment,
> > >>> perhaps
> > >>> > log
> > >>> > > > > > rolling should be based on that too.
> > >>> > > > > >
> > >>> > > > > I am not sure how to make log rolling work with the latest
> > >>> timestamp
> > >>> > in
> > >>> > > > > current log segment. Do you mean the log rolling can based on
> > the
> > >>> > last
> > >>> > > > log
> > >>> > > > > segment's latest timestamp? If so how do we roll out the
> first
> > >>> > segment?
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > > 4. In KIP-33, I presume the timestamp in the time index
> will
> > be
> > >>> > > > > > monotonically increasing. So, if all messages in a log
> > segment
> > >>> > have a
> > >>> > > > > > timestamp less than the largest timestamp in the previous
> log
> > >>> > > segment,
> > >>> > > > we
> > >>> > > > > > will use the latter to index this log segment?
> > >>> > > > > >
> > >>> > > > > Yes. The timestamps are monotonically increasing. If the
> > largest
> > >>> > > > timestamp
> > >>> > > > > in the previous segment is very big, it is possible the time
> > >>> index of
> > >>> > > the
> > >>> > > > > current segment only have two index entries (inserted during
> > >>> segment
> > >>> > > > > creation and roll out), both are pointing to a message in the
> > >>> > previous
> > >>> > > > log
> > >>> > > > > segment. This is the corner case I mentioned before that we
> > >>> should
> > >>> > > expire
> > >>> > > > > the next log segment even before expiring the previous log
> > >>> segment
> > >>> > just
> > >>> > > > > because the largest timestamp is in previous log segment. In
> > >>> current
> > >>> > > > > approach, we will wait until the previous log segment
> expires,
> > >>> and
> > >>> > then
> > >>> > > > > delete both the previous log segment and the next log
> segment.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > > 5. In KIP-32, in the wire protocol, we mention both
> timestamp
> > >>> and
> > >>> > > time.
> > >>> > > > > > They should be consistent.
> > >>> > > > > >
> > >>> > > > > Will fix the wiki page.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > > Jun
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > On Thu, Dec 10, 2015 at 10:13 AM, Becket Qin <
> > >>> becket....@gmail.com
> > >>> > >
> > >>> > > > > wrote:
> > >>> > > > > >
> > >>> > > > > > > Hey Jay,
> > >>> > > > > > >
> > >>> > > > > > > Thanks for the comments.
> > >>> > > > > > >
> > >>> > > > > > > Good point about the actions after when
> > >>> > max.message.time.difference
> > >>> > > > is
> > >>> > > > > > > exceeded. Rejection is a useful behavior although I
> cannot
> > >>> think
> > >>> > of
> > >>> > > > use
> > >>> > > > > > > case at LinkedIn at this moment. I think it makes sense
> to
> > >>> add a
> > >>> > > > > > > configuration.
> > >>> > > > > > >
> > >>> > > > > > > How about the following configurations?
> > >>> > > > > > > 1. message.timestamp.type=CreateTime/LogAppendTime
> > >>> > > > > > > 2. max.message.time.difference.ms
> > >>> > > > > > >
> > >>> > > > > > > if message.timestamp.type is set to CreateTime, when the
> > >>> broker
> > >>> > > > > receives
> > >>> > > > > > a
> > >>> > > > > > > message, it will further check
> > >>> max.message.time.difference.ms,
> > >>> > and
> > >>> > > > > will
> > >>> > > > > > > reject the message it the time difference exceeds the
> > >>> threshold.
> > >>> > > > > > > If message.timestamp.type is set to LogAppendTime, the
> > broker
> > >>> > will
> > >>> > > > > always
> > >>> > > > > > > stamp the message with current server time, regardless
> the
> > >>> value
> > >>> > of
> > >>> > > > > > > max.message.time.difference.ms
> > >>> > > > > > >
> > >>> > > > > > > This will make sure the message on the broker is either
> > >>> > CreateTime
> > >>> > > or
> > >>> > > > > > > LogAppendTime, but not mixture of both.
> > >>> > > > > > >
> > >>> > > > > > > What do you think?
> > >>> > > > > > >
> > >>> > > > > > > Thanks,
> > >>> > > > > > >
> > >>> > > > > > > Jiangjie (Becket) Qin
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > On Wed, Dec 9, 2015 at 2:42 PM, Jay Kreps <
> > j...@confluent.io>
> > >>> > > wrote:
> > >>> > > > > > >
> > >>> > > > > > > > Hey Becket,
> > >>> > > > > > > >
> > >>> > > > > > > > That summary of pros and cons sounds about right to me.
> > >>> > > > > > > >
> > >>> > > > > > > > There are potentially two actions you could take when
> > >>> > > > > > > > max.message.time.difference is exceeded--override it or
> > >>> reject
> > >>> > > the
> > >>> > > > > > > > message entirely. Can we pick one of these or does the
> > >>> action
> > >>> > > need
> > >>> > > > to
> > >>> > > > > > > > be configurable too? (I'm not sure). The downside of
> more
> > >>> > > > > > > > configuration is that it is more fiddly and has more
> > modes.
> > >>> > > > > > > >
> > >>> > > > > > > > I suppose the reason I was thinking of this as a
> > >>> "difference"
> > >>> > > > rather
> > >>> > > > > > > > than a hard type was that if you were going to go the
> > >>> reject
> > >>> > mode
> > >>> > > > you
> > >>> > > > > > > > would need some tolerance setting (i.e. if your SLA is
> > >>> that if
> > >>> > > your
> > >>> > > > > > > > timestamp is off by more than 10 minutes I give you an
> > >>> error).
> > >>> > I
> > >>> > > > > agree
> > >>> > > > > > > > with you that having one field that is potentially
> > >>> containing a
> > >>> > > mix
> > >>> > > > > of
> > >>> > > > > > > > two values is a bit weird.
> > >>> > > > > > > >
> > >>> > > > > > > > -Jay
> > >>> > > > > > > >
> > >>> > > > > > > > On Mon, Dec 7, 2015 at 5:17 PM, Becket Qin <
> > >>> > becket....@gmail.com
> > >>> > > >
> > >>> > > > > > wrote:
> > >>> > > > > > > > > It looks the format of the previous email was messed
> > up.
> > >>> Send
> > >>> > > it
> > >>> > > > > > again.
> > >>> > > > > > > > >
> > >>> > > > > > > > > Just to recap, the last proposal Jay made (with some
> > >>> > > > implementation
> > >>> > > > > > > > > details added)
> > >>> > > > > > > > > was:
> > >>> > > > > > > > >
> > >>> > > > > > > > > 1. Allow user to stamp the message when produce
> > >>> > > > > > > > >
> > >>> > > > > > > > > 2. When broker receives a message it take a look at
> the
> > >>> > > > difference
> > >>> > > > > > > > between
> > >>> > > > > > > > > its local time and the timestamp in the message.
> > >>> > > > > > > > >   a. If the time difference is within a configurable
> > >>> > > > > > > > > max.message.time.difference.ms, the server will
> accept
> > >>> it
> > >>> > and
> > >>> > > > > append
> > >>> > > > > > > it
> > >>> > > > > > > > to
> > >>> > > > > > > > > the log.
> > >>> > > > > > > > >   b. If the time difference is beyond the configured
> > >>> > > > > > > > > max.message.time.difference.ms, the server will
> > >>> override the
> > >>> > > > > > timestamp
> > >>> > > > > > > > with
> > >>> > > > > > > > > its current local time and append the message to the
> > log.
> > >>> > > > > > > > >   c. The default value of max.message.time.difference
> > >>> would
> > >>> > be
> > >>> > > > set
> > >>> > > > > to
> > >>> > > > > > > > > Long.MaxValue.
> > >>> > > > > > > > >
> > >>> > > > > > > > > 3. The configurable time difference threshold
> > >>> > > > > > > > > max.message.time.difference.ms will
> > >>> > > > > > > > > be a per topic configuration.
> > >>> > > > > > > > >
> > >>> > > > > > > > > 4. The indexed will be built so it has the following
> > >>> > guarantee.
> > >>> > > > > > > > >   a. If user search by time stamp:
> > >>> > > > > > > > >       - all the messages after that timestamp will be
> > >>> > consumed.
> > >>> > > > > > > > >       - user might see earlier messages.
> > >>> > > > > > > > >   b. The log retention will take a look at the last
> > time
> > >>> > index
> > >>> > > > > entry
> > >>> > > > > > in
> > >>> > > > > > > > the
> > >>> > > > > > > > > time index file. Because the last entry will be the
> > >>> latest
> > >>> > > > > timestamp
> > >>> > > > > > in
> > >>> > > > > > > > the
> > >>> > > > > > > > > entire log segment. If that entry expires, the log
> > >>> segment
> > >>> > will
> > >>> > > > be
> > >>> > > > > > > > deleted.
> > >>> > > > > > > > >   c. The log rolling has to depend on the earliest
> > >>> timestamp.
> > >>> > > In
> > >>> > > > > this
> > >>> > > > > > > > case
> > >>> > > > > > > > > we may need to keep a in memory timestamp only for
> the
> > >>> > current
> > >>> > > > > active
> > >>> > > > > > > > log.
> > >>> > > > > > > > > On recover, we will need to read the active log
> segment
> > >>> to
> > >>> > get
> > >>> > > > this
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > > of the earliest messages.
> > >>> > > > > > > > >
> > >>> > > > > > > > > 5. The downside of this proposal are:
> > >>> > > > > > > > >   a. The timestamp might not be monotonically
> > increasing.
> > >>> > > > > > > > >   b. The log retention might become
> non-deterministic.
> > >>> i.e.
> > >>> > > When
> > >>> > > > a
> > >>> > > > > > > > message
> > >>> > > > > > > > > will be deleted now depends on the timestamp of the
> > other
> > >>> > > > messages
> > >>> > > > > in
> > >>> > > > > > > the
> > >>> > > > > > > > > same log segment. And those timestamps are provided
> by
> > >>> > > > > > > > > user within a range depending on what the time
> > difference
> > >>> > > > threshold
> > >>> > > > > > > > > configuration is.
> > >>> > > > > > > > >   c. The semantic meaning of the timestamp in the
> > >>> messages
> > >>> > > could
> > >>> > > > > be a
> > >>> > > > > > > > little
> > >>> > > > > > > > > bit vague because some of them come from the producer
> > and
> > >>> > some
> > >>> > > of
> > >>> > > > > > them
> > >>> > > > > > > > are
> > >>> > > > > > > > > overwritten by brokers.
> > >>> > > > > > > > >
> > >>> > > > > > > > > 6. Although the proposal has some downsides, it gives
> > >>> user
> > >>> > the
> > >>> > > > > > > > flexibility
> > >>> > > > > > > > > to use the timestamp.
> > >>> > > > > > > > >   a. If the threshold is set to Long.MaxValue. The
> > >>> timestamp
> > >>> > in
> > >>> > > > the
> > >>> > > > > > > > message is
> > >>> > > > > > > > > equivalent to CreateTime.
> > >>> > > > > > > > >   b. If the threshold is set to 0. The timestamp in
> the
> > >>> > message
> > >>> > > > is
> > >>> > > > > > > > equivalent
> > >>> > > > > > > > > to LogAppendTime.
> > >>> > > > > > > > >
> > >>> > > > > > > > > This proposal actually allows user to use either
> > >>> CreateTime
> > >>> > or
> > >>> > > > > > > > LogAppendTime
> > >>> > > > > > > > > without introducing two timestamp concept at the same
> > >>> time. I
> > >>> > > > have
> > >>> > > > > > > > updated
> > >>> > > > > > > > > the wiki for KIP-32 and KIP-33 with this proposal.
> > >>> > > > > > > > >
> > >>> > > > > > > > > One thing I am thinking is that instead of having a
> > time
> > >>> > > > difference
> > >>> > > > > > > > threshold,
> > >>> > > > > > > > > should we simply set have a TimestampType
> > configuration?
> > >>> > > Because
> > >>> > > > in
> > >>> > > > > > > most
> > >>> > > > > > > > > cases, people will either set the threshold to 0 or
> > >>> > > > Long.MaxValue.
> > >>> > > > > > > > Setting
> > >>> > > > > > > > > anything in between will make the timestamp in the
> > >>> message
> > >>> > > > > > meaningless
> > >>> > > > > > > to
> > >>> > > > > > > > > user - user don't know if the timestamp has been
> > >>> overwritten
> > >>> > by
> > >>> > > > the
> > >>> > > > > > > > brokers.
> > >>> > > > > > > > >
> > >>> > > > > > > > > Any thoughts?
> > >>> > > > > > > > >
> > >>> > > > > > > > > Thanks,
> > >>> > > > > > > > > Jiangjie (Becket) Qin
> > >>> > > > > > > > >
> > >>> > > > > > > > > On Mon, Dec 7, 2015 at 10:33 AM, Jiangjie Qin
> > >>> > > > > > > <j...@linkedin.com.invalid
> > >>> > > > > > > > >
> > >>> > > > > > > > > wrote:
> > >>> > > > > > > > >
> > >>> > > > > > > > >> Bump up this thread.
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> Just to recap, the last proposal Jay made (with some
> > >>> > > > > implementation
> > >>> > > > > > > > details
> > >>> > > > > > > > >> added) was:
> > >>> > > > > > > > >>
> > >>> > > > > > > > >>    1. Allow user to stamp the message when produce
> > >>> > > > > > > > >>    2. When broker receives a message it take a look
> at
> > >>> the
> > >>> > > > > > difference
> > >>> > > > > > > > >>    between its local time and the timestamp in the
> > >>> message.
> > >>> > > > > > > > >>       - If the time difference is within a
> > configurable
> > >>> > > > > > > > >>       max.message.time.difference.ms, the server
> will
> > >>> > accept
> > >>> > > it
> > >>> > > > > and
> > >>> > > > > > > > append
> > >>> > > > > > > > >>       it to the log.
> > >>> > > > > > > > >>       - If the time difference is beyond the
> > configured
> > >>> > > > > > > > >>       max.message.time.difference.ms, the server
> will
> > >>> > > override
> > >>> > > > > the
> > >>> > > > > > > > >>       timestamp with its current local time and
> append
> > >>> the
> > >>> > > > message
> > >>> > > > > > to
> > >>> > > > > > > > the
> > >>> > > > > > > > >> log.
> > >>> > > > > > > > >>       - The default value of
> > max.message.time.difference
> > >>> > would
> > >>> > > > be
> > >>> > > > > > set
> > >>> > > > > > > to
> > >>> > > > > > > > >>       Long.MaxValue.
> > >>> > > > > > > > >>       3. The configurable time difference threshold
> > >>> > > > > > > > >>    max.message.time.difference.ms will be a per
> topic
> > >>> > > > > > configuration.
> > >>> > > > > > > > >>    4. The indexed will be built so it has the
> > following
> > >>> > > > guarantee.
> > >>> > > > > > > > >>       - If user search by time stamp:
> > >>> > > > > > > > >>    - all the messages after that timestamp will be
> > >>> consumed.
> > >>> > > > > > > > >>       - user might see earlier messages.
> > >>> > > > > > > > >>       - The log retention will take a look at the
> last
> > >>> time
> > >>> > > > index
> > >>> > > > > > > entry
> > >>> > > > > > > > in
> > >>> > > > > > > > >>       the time index file. Because the last entry
> will
> > >>> be
> > >>> > the
> > >>> > > > > latest
> > >>> > > > > > > > >> timestamp in
> > >>> > > > > > > > >>       the entire log segment. If that entry expires,
> > >>> the log
> > >>> > > > > segment
> > >>> > > > > > > > will
> > >>> > > > > > > > >> be
> > >>> > > > > > > > >>       deleted.
> > >>> > > > > > > > >>       - The log rolling has to depend on the
> earliest
> > >>> > > timestamp.
> > >>> > > > > In
> > >>> > > > > > > this
> > >>> > > > > > > > >>       case we may need to keep a in memory timestamp
> > >>> only
> > >>> > for
> > >>> > > > the
> > >>> > > > > > > > >> current active
> > >>> > > > > > > > >>       log. On recover, we will need to read the
> active
> > >>> log
> > >>> > > > segment
> > >>> > > > > > to
> > >>> > > > > > > > get
> > >>> > > > > > > > >> this
> > >>> > > > > > > > >>       timestamp of the earliest messages.
> > >>> > > > > > > > >>    5. The downside of this proposal are:
> > >>> > > > > > > > >>       - The timestamp might not be monotonically
> > >>> increasing.
> > >>> > > > > > > > >>       - The log retention might become
> > >>> non-deterministic.
> > >>> > i.e.
> > >>> > > > > When
> > >>> > > > > > a
> > >>> > > > > > > > >>       message will be deleted now depends on the
> > >>> timestamp
> > >>> > of
> > >>> > > > the
> > >>> > > > > > > > >> other messages
> > >>> > > > > > > > >>       in the same log segment. And those timestamps
> > are
> > >>> > > provided
> > >>> > > > > by
> > >>> > > > > > > > >> user within a
> > >>> > > > > > > > >>       range depending on what the time difference
> > >>> threshold
> > >>> > > > > > > > configuration
> > >>> > > > > > > > >> is.
> > >>> > > > > > > > >>       - The semantic meaning of the timestamp in the
> > >>> > messages
> > >>> > > > > could
> > >>> > > > > > > be a
> > >>> > > > > > > > >>       little bit vague because some of them come
> from
> > >>> the
> > >>> > > > producer
> > >>> > > > > > and
> > >>> > > > > > > > >> some of
> > >>> > > > > > > > >>       them are overwritten by brokers.
> > >>> > > > > > > > >>       6. Although the proposal has some downsides,
> it
> > >>> gives
> > >>> > > user
> > >>> > > > > the
> > >>> > > > > > > > >>    flexibility to use the timestamp.
> > >>> > > > > > > > >>    - If the threshold is set to Long.MaxValue. The
> > >>> timestamp
> > >>> > > in
> > >>> > > > > the
> > >>> > > > > > > > message
> > >>> > > > > > > > >>       is equivalent to CreateTime.
> > >>> > > > > > > > >>       - If the threshold is set to 0. The timestamp
> in
> > >>> the
> > >>> > > > message
> > >>> > > > > > is
> > >>> > > > > > > > >>       equivalent to LogAppendTime.
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> This proposal actually allows user to use either
> > >>> CreateTime
> > >>> > or
> > >>> > > > > > > > >> LogAppendTime without introducing two timestamp
> > concept
> > >>> at
> > >>> > the
> > >>> > > > > same
> > >>> > > > > > > > time. I
> > >>> > > > > > > > >> have updated the wiki for KIP-32 and KIP-33 with
> this
> > >>> > > proposal.
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> One thing I am thinking is that instead of having a
> > time
> > >>> > > > > difference
> > >>> > > > > > > > >> threshold, should we simply set have a TimestampType
> > >>> > > > > configuration?
> > >>> > > > > > > > Because
> > >>> > > > > > > > >> in most cases, people will either set the threshold
> to
> > >>> 0 or
> > >>> > > > > > > > Long.MaxValue.
> > >>> > > > > > > > >> Setting anything in between will make the timestamp
> in
> > >>> the
> > >>> > > > message
> > >>> > > > > > > > >> meaningless to user - user don't know if the
> timestamp
> > >>> has
> > >>> > > been
> > >>> > > > > > > > overwritten
> > >>> > > > > > > > >> by the brokers.
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> Any thoughts?
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> Thanks,
> > >>> > > > > > > > >> Jiangjie (Becket) Qin
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> On Mon, Oct 26, 2015 at 1:23 PM, Jiangjie Qin <
> > >>> > > > j...@linkedin.com>
> > >>> > > > > > > > wrote:
> > >>> > > > > > > > >>
> > >>> > > > > > > > >> > Hi Jay,
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > Thanks for such detailed explanation. I think we
> > both
> > >>> are
> > >>> > > > trying
> > >>> > > > > > to
> > >>> > > > > > > > make
> > >>> > > > > > > > >> > CreateTime work for us if possible. To me by
> "work"
> > it
> > >>> > means
> > >>> > > > > clear
> > >>> > > > > > > > >> > guarantees on:
> > >>> > > > > > > > >> > 1. Log Retention Time enforcement.
> > >>> > > > > > > > >> > 2. Log Rolling time enforcement (This might be
> less
> > a
> > >>> > > concern
> > >>> > > > as
> > >>> > > > > > you
> > >>> > > > > > > > >> > pointed out)
> > >>> > > > > > > > >> > 3. Application search message by time.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > WRT (1), I agree the expectation for log retention
> > >>> might
> > >>> > be
> > >>> > > > > > > different
> > >>> > > > > > > > >> > depending on who we ask. But my concern is about
> the
> > >>> level
> > >>> > > of
> > >>> > > > > > > > guarantee
> > >>> > > > > > > > >> we
> > >>> > > > > > > > >> > give to user. My observation is that a clear
> > >>> guarantee to
> > >>> > > user
> > >>> > > > > is
> > >>> > > > > > > > >> critical
> > >>> > > > > > > > >> > regardless of the mechanism we choose. And this is
> > the
> > >>> > > subtle
> > >>> > > > > but
> > >>> > > > > > > > >> important
> > >>> > > > > > > > >> > difference between using LogAppendTime and
> > CreateTime.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > Let's say user asks this question: How long will
> my
> > >>> > message
> > >>> > > > stay
> > >>> > > > > > in
> > >>> > > > > > > > >> Kafka?
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > If we use LogAppendTime for log retention, the
> > answer
> > >>> is
> > >>> > > > message
> > >>> > > > > > > will
> > >>> > > > > > > > >> stay
> > >>> > > > > > > > >> > in Kafka for retention time after the message is
> > >>> produced
> > >>> > > (to
> > >>> > > > be
> > >>> > > > > > > more
> > >>> > > > > > > > >> > precise, upper bounded by log.rolling.ms +
> > >>> > log.retention.ms
> > >>> > > ).
> > >>> > > > > > User
> > >>> > > > > > > > has a
> > >>> > > > > > > > >> > clear guarantee and they may decide whether or not
> > to
> > >>> put
> > >>> > > the
> > >>> > > > > > > message
> > >>> > > > > > > > >> into
> > >>> > > > > > > > >> > Kafka. Or how to adjust the retention time
> according
> > >>> to
> > >>> > > their
> > >>> > > > > > > > >> requirements.
> > >>> > > > > > > > >> > If we use create time for log retention, the
> answer
> > >>> would
> > >>> > be
> > >>> > > > it
> > >>> > > > > > > > depends.
> > >>> > > > > > > > >> > The best answer we can give is at least
> > retention.ms
> > >>> > > because
> > >>> > > > > > there
> > >>> > > > > > > > is no
> > >>> > > > > > > > >> > guarantee when the messages will be deleted after
> > >>> that.
> > >>> > If a
> > >>> > > > > > message
> > >>> > > > > > > > sits
> > >>> > > > > > > > >> > somewhere behind a larger create time, the message
> > >>> might
> > >>> > > stay
> > >>> > > > > > longer
> > >>> > > > > > > > than
> > >>> > > > > > > > >> > expected. But we don't know how longer it would be
> > >>> because
> > >>> > > it
> > >>> > > > > > > depends
> > >>> > > > > > > > on
> > >>> > > > > > > > >> > the create time. In this case, it is hard for user
> > to
> > >>> > decide
> > >>> > > > > what
> > >>> > > > > > to
> > >>> > > > > > > > do.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > I am worrying about this because a blurring
> > guarantee
> > >>> has
> > >>> > > > bitten
> > >>> > > > > > us
> > >>> > > > > > > > >> > before, e.g. Topic creation. We have received many
> > >>> > questions
> > >>> > > > > like
> > >>> > > > > > > > "why my
> > >>> > > > > > > > >> > topic is not there after I created it". I can
> > imagine
> > >>> we
> > >>> > > > receive
> > >>> > > > > > > > similar
> > >>> > > > > > > > >> > question asking "why my message is still there
> after
> > >>> > > retention
> > >>> > > > > > time
> > >>> > > > > > > > has
> > >>> > > > > > > > >> > reached". So my understanding is that a clear and
> > >>> solid
> > >>> > > > > guarantee
> > >>> > > > > > is
> > >>> > > > > > > > >> better
> > >>> > > > > > > > >> > than having a mechanism that works in most cases
> but
> > >>> > > > > occasionally
> > >>> > > > > > > does
> > >>> > > > > > > > >> not
> > >>> > > > > > > > >> > work.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > If we think of the retention guarantee we provide
> > with
> > >>> > > > > > > LogAppendTime,
> > >>> > > > > > > > it
> > >>> > > > > > > > >> > is not broken as you said, because we are telling
> > >>> user the
> > >>> > > log
> > >>> > > > > > > > retention
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> > NOT based on create time at the first place.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > WRT (3), no matter whether we index on
> LogAppendTime
> > >>> or
> > >>> > > > > > CreateTime,
> > >>> > > > > > > > the
> > >>> > > > > > > > >> > best guarantee we can provide with user is "not
> > >>> missing
> > >>> > > > message
> > >>> > > > > > > after
> > >>> > > > > > > > a
> > >>> > > > > > > > >> > certain timestamp". Therefore I actually really
> like
> > >>> to
> > >>> > > index
> > >>> > > > on
> > >>> > > > > > > > >> CreateTime
> > >>> > > > > > > > >> > because that is the timestamp we provide to user,
> > and
> > >>> we
> > >>> > can
> > >>> > > > > have
> > >>> > > > > > > the
> > >>> > > > > > > > >> solid
> > >>> > > > > > > > >> > guarantee.
> > >>> > > > > > > > >> > On the other hand, indexing on LogAppendTime and
> > >>> giving
> > >>> > user
> > >>> > > > > > > > CreateTime
> > >>> > > > > > > > >> > does not provide solid guarantee when user do
> search
> > >>> based
> > >>> > > on
> > >>> > > > > > > > timestamp.
> > >>> > > > > > > > >> It
> > >>> > > > > > > > >> > only works when LogAppendTime is always no earlier
> > >>> than
> > >>> > > > > > CreateTime.
> > >>> > > > > > > > This
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> > a reasonable assumption and we can easily enforce
> > it.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > With above, I am not sure if we can avoid server
> > >>> timestamp
> > >>> > > to
> > >>> > > > > make
> > >>> > > > > > > log
> > >>> > > > > > > > >> > retention work with a clear guarantee. For
> searching
> > >>> by
> > >>> > > > > timestamp
> > >>> > > > > > > use
> > >>> > > > > > > > >> case,
> > >>> > > > > > > > >> > I really want to have the index built on
> CreateTime.
> > >>> But
> > >>> > > with
> > >>> > > > a
> > >>> > > > > > > > >> reasonable
> > >>> > > > > > > > >> > assumption and timestamp enforcement, a
> > LogAppendTime
> > >>> > index
> > >>> > > > > would
> > >>> > > > > > > also
> > >>> > > > > > > > >> work.
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > Thanks,
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > Jiangjie (Becket) Qin
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> > On Thu, Oct 22, 2015 at 10:48 AM, Jay Kreps <
> > >>> > > j...@confluent.io
> > >>> > > > >
> > >>> > > > > > > wrote:
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >> Hey Becket,
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> Let me see if I can address your concerns:
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> 1. Let's say we have two source clusters that are
> > >>> > mirrored
> > >>> > > to
> > >>> > > > > the
> > >>> > > > > > > > same
> > >>> > > > > > > > >> >> > target cluster. For some reason one of the
> mirror
> > >>> maker
> > >>> > > > from
> > >>> > > > > a
> > >>> > > > > > > > cluster
> > >>> > > > > > > > >> >> dies
> > >>> > > > > > > > >> >> > and after fix the issue we want to resume
> > >>> mirroring. In
> > >>> > > > this
> > >>> > > > > > case
> > >>> > > > > > > > it
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> >> > possible that when the mirror maker resumes
> > >>> mirroring,
> > >>> > > the
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> of
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > messages have already gone beyond the
> acceptable
> > >>> > > timestamp
> > >>> > > > > > range
> > >>> > > > > > > on
> > >>> > > > > > > > >> >> broker.
> > >>> > > > > > > > >> >> > In order to let those messages go through, we
> > have
> > >>> to
> > >>> > > bump
> > >>> > > > up
> > >>> > > > > > the
> > >>> > > > > > > > >> >> > *max.append.delay
> > >>> > > > > > > > >> >> > *for all the topics on the target broker. This
> > >>> could be
> > >>> > > > > > painful.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> Actually what I was suggesting was different.
> Here
> > >>> is my
> > >>> > > > > > > observation:
> > >>> > > > > > > > >> >> clusters/topics directly produced to by
> > applications
> > >>> > have a
> > >>> > > > > valid
> > >>> > > > > > > > >> >> assertion
> > >>> > > > > > > > >> >> that log append time and create time are similar
> > >>> (let's
> > >>> > > call
> > >>> > > > > > these
> > >>> > > > > > > > >> >> "unbuffered"); other cluster/topic such as those
> > that
> > >>> > > receive
> > >>> > > > > > data
> > >>> > > > > > > > from
> > >>> > > > > > > > >> a
> > >>> > > > > > > > >> >> database, a log file, or another kafka cluster
> > don't
> > >>> have
> > >>> > > > that
> > >>> > > > > > > > >> assertion,
> > >>> > > > > > > > >> >> for these "buffered" clusters data can be
> > arbitrarily
> > >>> > late.
> > >>> > > > > This
> > >>> > > > > > > > means
> > >>> > > > > > > > >> any
> > >>> > > > > > > > >> >> use of log append time on these buffered clusters
> > is
> > >>> not
> > >>> > > very
> > >>> > > > > > > > >> meaningful,
> > >>> > > > > > > > >> >> and create time and log append time "should" be
> > >>> similar
> > >>> > on
> > >>> > > > > > > unbuffered
> > >>> > > > > > > > >> >> clusters so you can probably use either.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> Using log append time on buffered clusters
> actually
> > >>> > results
> > >>> > > > in
> > >>> > > > > > bad
> > >>> > > > > > > > >> things.
> > >>> > > > > > > > >> >> If you request the offset for a given time you
> get
> > >>> don't
> > >>> > > end
> > >>> > > > up
> > >>> > > > > > > > getting
> > >>> > > > > > > > >> >> data for that time but rather data that showed up
> > at
> > >>> that
> > >>> > > > time.
> > >>> > > > > > If
> > >>> > > > > > > > you
> > >>> > > > > > > > >> try
> > >>> > > > > > > > >> >> to retain 7 days of data it may mostly work but
> any
> > >>> kind
> > >>> > of
> > >>> > > > > > > > >> bootstrapping
> > >>> > > > > > > > >> >> will result in retaining much more (potentially
> the
> > >>> whole
> > >>> > > > > > database
> > >>> > > > > > > > >> >> contents!).
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> So what I am suggesting in terms of the use of
> the
> > >>> > > > > > max.append.delay
> > >>> > > > > > > > is
> > >>> > > > > > > > >> >> that
> > >>> > > > > > > > >> >> unbuffered clusters would have this set and
> > buffered
> > >>> > > clusters
> > >>> > > > > > would
> > >>> > > > > > > > not.
> > >>> > > > > > > > >> >> In
> > >>> > > > > > > > >> >> other words, in LI terminology, tracking and
> > metrics
> > >>> > > clusters
> > >>> > > > > > would
> > >>> > > > > > > > have
> > >>> > > > > > > > >> >> this enforced, aggregate and replica clusters
> > >>> wouldn't.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> So you DO have the issue of potentially
> maintaining
> > >>> more
> > >>> > > data
> > >>> > > > > > than
> > >>> > > > > > > > you
> > >>> > > > > > > > >> >> need
> > >>> > > > > > > > >> >> to on aggregate clusters if your mirroring skews,
> > >>> but you
> > >>> > > > DON'T
> > >>> > > > > > > need
> > >>> > > > > > > > to
> > >>> > > > > > > > >> >> tweak the setting as you described.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> 2. Let's say in the above scenario we let the
> > >>> messages
> > >>> > in,
> > >>> > > at
> > >>> > > > > > that
> > >>> > > > > > > > point
> > >>> > > > > > > > >> >> > some log segments in the target cluster might
> > have
> > >>> a
> > >>> > wide
> > >>> > > > > range
> > >>> > > > > > > of
> > >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log
> > rolling
> > >>> > could
> > >>> > > > be
> > >>> > > > > > > tricky
> > >>> > > > > > > > >> >> because
> > >>> > > > > > > > >> >> > the first time index entry does not necessarily
> > >>> have
> > >>> > the
> > >>> > > > > > smallest
> > >>> > > > > > > > >> >> timestamp
> > >>> > > > > > > > >> >> > of all the messages in the log segment.
> Instead,
> > >>> it is
> > >>> > > the
> > >>> > > > > > > largest
> > >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
> > >>> log to
> > >>> > > find
> > >>> > > > > the
> > >>> > > > > > > > >> message
> > >>> > > > > > > > >> >> > with smallest offset to see if we should roll.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> I think there are two uses for time-based log
> > >>> rolling:
> > >>> > > > > > > > >> >> 1. Making the offset lookup by timestamp work
> > >>> > > > > > > > >> >> 2. Ensuring we don't retain data indefinitely if
> it
> > >>> is
> > >>> > > > supposed
> > >>> > > > > > to
> > >>> > > > > > > > get
> > >>> > > > > > > > >> >> purged after 7 days
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> But think about these two use cases. (1) is
> totally
> > >>> > > obviated
> > >>> > > > by
> > >>> > > > > > the
> > >>> > > > > > > > >> >> time=>offset index we are adding which yields
> much
> > >>> more
> > >>> > > > > granular
> > >>> > > > > > > > offset
> > >>> > > > > > > > >> >> lookups. (2) Is actually totally broken if you
> > >>> switch to
> > >>> > > > append
> > >>> > > > > > > time,
> > >>> > > > > > > > >> >> right? If you want to be sure for
> security/privacy
> > >>> > reasons
> > >>> > > > you
> > >>> > > > > > only
> > >>> > > > > > > > >> retain
> > >>> > > > > > > > >> >> 7 days of data then if the log append and create
> > time
> > >>> > > diverge
> > >>> > > > > you
> > >>> > > > > > > > >> actually
> > >>> > > > > > > > >> >> violate this requirement.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> I think 95% of people care about (1) which is
> > solved
> > >>> in
> > >>> > the
> > >>> > > > > > > proposal
> > >>> > > > > > > > and
> > >>> > > > > > > > >> >> (2) is actually broken today as well as in both
> > >>> > proposals.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> 3. Theoretically it is possible that an older log
> > >>> segment
> > >>> > > > > > contains
> > >>> > > > > > > > >> >> > timestamps that are older than all the messages
> > in
> > >>> a
> > >>> > > newer
> > >>> > > > > log
> > >>> > > > > > > > >> segment.
> > >>> > > > > > > > >> >> It
> > >>> > > > > > > > >> >> > would be weird that we are supposed to delete
> the
> > >>> newer
> > >>> > > log
> > >>> > > > > > > segment
> > >>> > > > > > > > >> >> before
> > >>> > > > > > > > >> >> > we delete the older log segment.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> The index timestamps would always be a lower
> bound
> > >>> (i.e.
> > >>> > > the
> > >>> > > > > > > maximum
> > >>> > > > > > > > at
> > >>> > > > > > > > >> >> that time) so I don't think that is possible.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >>  4. In bootstrap case, if we reload the data to a
> > >>> Kafka
> > >>> > > > > cluster,
> > >>> > > > > > we
> > >>> > > > > > > > have
> > >>> > > > > > > > >> >> to
> > >>> > > > > > > > >> >> > make sure we configure the topic correctly
> before
> > >>> we
> > >>> > load
> > >>> > > > the
> > >>> > > > > > > data.
> > >>> > > > > > > > >> >> > Otherwise the message might either be rejected
> > >>> because
> > >>> > > the
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> >> too
> > >>> > > > > > > > >> >> > old, or it might be deleted immediately because
> > the
> > >>> > > > retention
> > >>> > > > > > > time
> > >>> > > > > > > > has
> > >>> > > > > > > > >> >> > reached.
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> See (1).
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> -Jay
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> On Tue, Oct 13, 2015 at 7:30 PM, Jiangjie Qin
> > >>> > > > > > > > <j...@linkedin.com.invalid
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >> wrote:
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >> > Hey Jay and Guozhang,
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > Thanks a lot for the reply. So if I understand
> > >>> > correctly,
> > >>> > > > > Jay's
> > >>> > > > > > > > >> proposal
> > >>> > > > > > > > >> >> > is:
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > 1. Let client stamp the message create time.
> > >>> > > > > > > > >> >> > 2. Broker build index based on client-stamped
> > >>> message
> > >>> > > > create
> > >>> > > > > > > time.
> > >>> > > > > > > > >> >> > 3. Broker only takes message whose create time
> is
> > >>> > withing
> > >>> > > > > > current
> > >>> > > > > > > > time
> > >>> > > > > > > > >> >> > plus/minus T (T is a configuration
> > >>> *max.append.delay*,
> > >>> > > > could
> > >>> > > > > be
> > >>> > > > > > > > topic
> > >>> > > > > > > > >> >> level
> > >>> > > > > > > > >> >> > configuration), if the timestamp is out of this
> > >>> range,
> > >>> > > > broker
> > >>> > > > > > > > rejects
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > message.
> > >>> > > > > > > > >> >> > 4. Because the create time of messages can be
> out
> > >>> of
> > >>> > > order,
> > >>> > > > > > when
> > >>> > > > > > > > >> broker
> > >>> > > > > > > > >> >> > builds the time based index it only provides
> the
> > >>> > > guarantee
> > >>> > > > > that
> > >>> > > > > > > if
> > >>> > > > > > > > a
> > >>> > > > > > > > >> >> > consumer starts consuming from the offset
> > returned
> > >>> by
> > >>> > > > > searching
> > >>> > > > > > > by
> > >>> > > > > > > > >> >> > timestamp t, they will not miss any message
> > created
> > >>> > after
> > >>> > > > t,
> > >>> > > > > > but
> > >>> > > > > > > > might
> > >>> > > > > > > > >> >> see
> > >>> > > > > > > > >> >> > some messages created before t.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > To build the time based index, every time when
> a
> > >>> broker
> > >>> > > > needs
> > >>> > > > > > to
> > >>> > > > > > > > >> insert
> > >>> > > > > > > > >> >> a
> > >>> > > > > > > > >> >> > new time index entry, the entry would be
> > >>> > > > > > > > {Largest_Timestamp_Ever_Seen
> > >>> > > > > > > > >> ->
> > >>> > > > > > > > >> >> > Current_Offset}. This basically means any
> > timestamp
> > >>> > > larger
> > >>> > > > > than
> > >>> > > > > > > the
> > >>> > > > > > > > >> >> > Largest_Timestamp_Ever_Seen must come after
> this
> > >>> offset
> > >>> > > > > because
> > >>> > > > > > > it
> > >>> > > > > > > > >> never
> > >>> > > > > > > > >> >> > saw them before. So we don't miss any message
> > with
> > >>> > larger
> > >>> > > > > > > > timestamp.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > (@Guozhang, in this case, for log retention we
> > only
> > >>> > need
> > >>> > > to
> > >>> > > > > > take
> > >>> > > > > > > a
> > >>> > > > > > > > >> look
> > >>> > > > > > > > >> >> at
> > >>> > > > > > > > >> >> > the last time index entry, because it must be
> the
> > >>> > largest
> > >>> > > > > > > timestamp
> > >>> > > > > > > > >> >> ever,
> > >>> > > > > > > > >> >> > if that timestamp is overdue, we can safely
> > delete
> > >>> any
> > >>> > > log
> > >>> > > > > > > segment
> > >>> > > > > > > > >> >> before
> > >>> > > > > > > > >> >> > that. So we don't need to scan the log segment
> > >>> file for
> > >>> > > log
> > >>> > > > > > > > retention)
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > I assume that we are still going to have the
> new
> > >>> > > > FetchRequest
> > >>> > > > > > to
> > >>> > > > > > > > allow
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > time index replication for replicas.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > I think Jay's main point here is that we don't
> > >>> want to
> > >>> > > have
> > >>> > > > > two
> > >>> > > > > > > > >> >> timestamp
> > >>> > > > > > > > >> >> > concepts in Kafka, which I agree is a
> reasonable
> > >>> > concern.
> > >>> > > > > And I
> > >>> > > > > > > > also
> > >>> > > > > > > > >> >> agree
> > >>> > > > > > > > >> >> > that create time is more meaningful than
> > >>> LogAppendTime
> > >>> > > for
> > >>> > > > > > users.
> > >>> > > > > > > > But
> > >>> > > > > > > > >> I
> > >>> > > > > > > > >> >> am
> > >>> > > > > > > > >> >> > not sure if making everything base on Create
> Time
> > >>> would
> > >>> > > > work
> > >>> > > > > in
> > >>> > > > > > > all
> > >>> > > > > > > > >> >> cases.
> > >>> > > > > > > > >> >> > Here are my questions about this approach:
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > 1. Let's say we have two source clusters that
> are
> > >>> > > mirrored
> > >>> > > > to
> > >>> > > > > > the
> > >>> > > > > > > > same
> > >>> > > > > > > > >> >> > target cluster. For some reason one of the
> mirror
> > >>> maker
> > >>> > > > from
> > >>> > > > > a
> > >>> > > > > > > > cluster
> > >>> > > > > > > > >> >> dies
> > >>> > > > > > > > >> >> > and after fix the issue we want to resume
> > >>> mirroring. In
> > >>> > > > this
> > >>> > > > > > case
> > >>> > > > > > > > it
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> >> > possible that when the mirror maker resumes
> > >>> mirroring,
> > >>> > > the
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> of
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > messages have already gone beyond the
> acceptable
> > >>> > > timestamp
> > >>> > > > > > range
> > >>> > > > > > > on
> > >>> > > > > > > > >> >> broker.
> > >>> > > > > > > > >> >> > In order to let those messages go through, we
> > have
> > >>> to
> > >>> > > bump
> > >>> > > > up
> > >>> > > > > > the
> > >>> > > > > > > > >> >> > *max.append.delay
> > >>> > > > > > > > >> >> > *for all the topics on the target broker. This
> > >>> could be
> > >>> > > > > > painful.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > 2. Let's say in the above scenario we let the
> > >>> messages
> > >>> > > in,
> > >>> > > > at
> > >>> > > > > > > that
> > >>> > > > > > > > >> point
> > >>> > > > > > > > >> >> > some log segments in the target cluster might
> > have
> > >>> a
> > >>> > wide
> > >>> > > > > range
> > >>> > > > > > > of
> > >>> > > > > > > > >> >> > timestamps, like Guozhang mentioned the log
> > rolling
> > >>> > could
> > >>> > > > be
> > >>> > > > > > > tricky
> > >>> > > > > > > > >> >> because
> > >>> > > > > > > > >> >> > the first time index entry does not necessarily
> > >>> have
> > >>> > the
> > >>> > > > > > smallest
> > >>> > > > > > > > >> >> timestamp
> > >>> > > > > > > > >> >> > of all the messages in the log segment.
> Instead,
> > >>> it is
> > >>> > > the
> > >>> > > > > > > largest
> > >>> > > > > > > > >> >> > timestamp ever seen. We have to scan the entire
> > >>> log to
> > >>> > > find
> > >>> > > > > the
> > >>> > > > > > > > >> message
> > >>> > > > > > > > >> >> > with smallest offset to see if we should roll.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > 3. Theoretically it is possible that an older
> log
> > >>> > segment
> > >>> > > > > > > contains
> > >>> > > > > > > > >> >> > timestamps that are older than all the messages
> > in
> > >>> a
> > >>> > > newer
> > >>> > > > > log
> > >>> > > > > > > > >> segment.
> > >>> > > > > > > > >> >> It
> > >>> > > > > > > > >> >> > would be weird that we are supposed to delete
> the
> > >>> newer
> > >>> > > log
> > >>> > > > > > > segment
> > >>> > > > > > > > >> >> before
> > >>> > > > > > > > >> >> > we delete the older log segment.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > 4. In bootstrap case, if we reload the data to
> a
> > >>> Kafka
> > >>> > > > > cluster,
> > >>> > > > > > > we
> > >>> > > > > > > > >> have
> > >>> > > > > > > > >> >> to
> > >>> > > > > > > > >> >> > make sure we configure the topic correctly
> before
> > >>> we
> > >>> > load
> > >>> > > > the
> > >>> > > > > > > data.
> > >>> > > > > > > > >> >> > Otherwise the message might either be rejected
> > >>> because
> > >>> > > the
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> is
> > >>> > > > > > > > >> >> too
> > >>> > > > > > > > >> >> > old, or it might be deleted immediately because
> > the
> > >>> > > > retention
> > >>> > > > > > > time
> > >>> > > > > > > > has
> > >>> > > > > > > > >> >> > reached.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > I am very concerned about the operational
> > overhead
> > >>> and
> > >>> > > the
> > >>> > > > > > > > ambiguity
> > >>> > > > > > > > >> of
> > >>> > > > > > > > >> >> > guarantees we introduce if we purely rely on
> > >>> > CreateTime.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > It looks to me that the biggest issue of
> adopting
> > >>> > > > CreateTime
> > >>> > > > > > > > >> everywhere
> > >>> > > > > > > > >> >> is
> > >>> > > > > > > > >> >> > CreateTime can have big gaps. These gaps could
> be
> > >>> > caused
> > >>> > > by
> > >>> > > > > > > several
> > >>> > > > > > > > >> >> cases:
> > >>> > > > > > > > >> >> > [1]. Faulty clients
> > >>> > > > > > > > >> >> > [2]. Natural delays from different source
> > >>> > > > > > > > >> >> > [3]. Bootstrap
> > >>> > > > > > > > >> >> > [4]. Failure recovery
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > Jay's alternative proposal solves [1], perhaps
> > >>> solve
> > >>> > [2]
> > >>> > > as
> > >>> > > > > > well
> > >>> > > > > > > > if we
> > >>> > > > > > > > >> >> are
> > >>> > > > > > > > >> >> > able to set a reasonable max.append.delay. But
> it
> > >>> does
> > >>> > > not
> > >>> > > > > seem
> > >>> > > > > > > > >> address
> > >>> > > > > > > > >> >> [3]
> > >>> > > > > > > > >> >> > and [4]. I actually doubt if [3] and [4] are
> > >>> solvable
> > >>> > > > because
> > >>> > > > > > it
> > >>> > > > > > > > looks
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > CreateTime gap is unavoidable in those two
> cases.
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > Thanks,
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > Jiangjie (Becket) Qin
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > On Tue, Oct 13, 2015 at 3:23 PM, Guozhang Wang
> <
> > >>> > > > > > > wangg...@gmail.com
> > >>> > > > > > > > >
> > >>> > > > > > > > >> >> wrote:
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > > Just to complete Jay's option, here is my
> > >>> > > understanding:
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > 1. For log retention: if we want to remove
> data
> > >>> > before
> > >>> > > > time
> > >>> > > > > > t,
> > >>> > > > > > > we
> > >>> > > > > > > > >> look
> > >>> > > > > > > > >> >> > into
> > >>> > > > > > > > >> >> > > the index file of each segment and find the
> > >>> largest
> > >>> > > > > timestamp
> > >>> > > > > > > t'
> > >>> > > > > > > > <
> > >>> > > > > > > > >> t,
> > >>> > > > > > > > >> >> > find
> > >>> > > > > > > > >> >> > > the corresponding timestamp and start
> scanning
> > >>> to the
> > >>> > > end
> > >>> > > > > of
> > >>> > > > > > > the
> > >>> > > > > > > > >> >> segment,
> > >>> > > > > > > > >> >> > > if there is no entry with timestamp >= t, we
> > can
> > >>> > delete
> > >>> > > > > this
> > >>> > > > > > > > >> segment;
> > >>> > > > > > > > >> >> if
> > >>> > > > > > > > >> >> > a
> > >>> > > > > > > > >> >> > > segment's index smallest timestamp is larger
> > >>> than t,
> > >>> > we
> > >>> > > > can
> > >>> > > > > > > skip
> > >>> > > > > > > > >> that
> > >>> > > > > > > > >> >> > > segment.
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > 2. For log rolling: if we want to start a new
> > >>> segment
> > >>> > > > after
> > >>> > > > > > > time
> > >>> > > > > > > > t,
> > >>> > > > > > > > >> we
> > >>> > > > > > > > >> >> > look
> > >>> > > > > > > > >> >> > > into the active segment's index file, if the
> > >>> largest
> > >>> > > > > > timestamp
> > >>> > > > > > > is
> > >>> > > > > > > > >> >> > already >
> > >>> > > > > > > > >> >> > > t, we can roll a new segment immediately; if
> it
> > >>> is <
> > >>> > t,
> > >>> > > > we
> > >>> > > > > > read
> > >>> > > > > > > > its
> > >>> > > > > > > > >> >> > > corresponding offset and start scanning to
> the
> > >>> end of
> > >>> > > the
> > >>> > > > > > > > segment,
> > >>> > > > > > > > >> if
> > >>> > > > > > > > >> >> we
> > >>> > > > > > > > >> >> > > find a record whose timestamp > t, we can
> roll
> > a
> > >>> new
> > >>> > > > > segment.
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > For log rolling we only need to possibly
> scan a
> > >>> small
> > >>> > > > > portion
> > >>> > > > > > > the
> > >>> > > > > > > > >> >> active
> > >>> > > > > > > > >> >> > > segment, which should be fine; for log
> > retention
> > >>> we
> > >>> > may
> > >>> > > > in
> > >>> > > > > > the
> > >>> > > > > > > > worst
> > >>> > > > > > > > >> >> case
> > >>> > > > > > > > >> >> > > end up scanning all segments, but in practice
> > we
> > >>> may
> > >>> > > skip
> > >>> > > > > > most
> > >>> > > > > > > of
> > >>> > > > > > > > >> them
> > >>> > > > > > > > >> >> > > since their smallest timestamp in the index
> > file
> > >>> is
> > >>> > > > larger
> > >>> > > > > > than
> > >>> > > > > > > > t.
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > Guozhang
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > On Tue, Oct 13, 2015 at 12:52 AM, Jay Kreps <
> > >>> > > > > > j...@confluent.io>
> > >>> > > > > > > > >> wrote:
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > > I think it should be possible to index
> > >>> out-of-order
> > >>> > > > > > > timestamps.
> > >>> > > > > > > > >> The
> > >>> > > > > > > > >> >> > > > timestamp index would be similar to the
> > offset
> > >>> > > index, a
> > >>> > > > > > > memory
> > >>> > > > > > > > >> >> mapped
> > >>> > > > > > > > >> >> > > file
> > >>> > > > > > > > >> >> > > > appended to as part of the log append, but
> > >>> would
> > >>> > have
> > >>> > > > the
> > >>> > > > > > > > format
> > >>> > > > > > > > >> >> > > >   timestamp offset
> > >>> > > > > > > > >> >> > > > The timestamp entries would be monotonic
> and
> > as
> > >>> > with
> > >>> > > > the
> > >>> > > > > > > offset
> > >>> > > > > > > > >> >> index
> > >>> > > > > > > > >> >> > > would
> > >>> > > > > > > > >> >> > > > be no more often then every 4k (or some
> > >>> > configurable
> > >>> > > > > > > threshold
> > >>> > > > > > > > to
> > >>> > > > > > > > >> >> keep
> > >>> > > > > > > > >> >> > > the
> > >>> > > > > > > > >> >> > > > index small--actually for timestamp it
> could
> > >>> > probably
> > >>> > > > be
> > >>> > > > > > much
> > >>> > > > > > > > more
> > >>> > > > > > > > >> >> > sparse
> > >>> > > > > > > > >> >> > > > than 4k).
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > > > A search for a timestamp t yields an
> offset o
> > >>> > before
> > >>> > > > > which
> > >>> > > > > > no
> > >>> > > > > > > > >> prior
> > >>> > > > > > > > >> >> > > message
> > >>> > > > > > > > >> >> > > > has a timestamp >= t. In other words if you
> > >>> read
> > >>> > the
> > >>> > > > log
> > >>> > > > > > > > starting
> > >>> > > > > > > > >> >> with
> > >>> > > > > > > > >> >> > o
> > >>> > > > > > > > >> >> > > > you are guaranteed not to miss any messages
> > >>> > occurring
> > >>> > > > at
> > >>> > > > > t
> > >>> > > > > > or
> > >>> > > > > > > > >> later
> > >>> > > > > > > > >> >> > > though
> > >>> > > > > > > > >> >> > > > you may get many before t (due to
> > >>> > out-of-orderness).
> > >>> > > > > Unlike
> > >>> > > > > > > the
> > >>> > > > > > > > >> >> offset
> > >>> > > > > > > > >> >> > > > index this bound doesn't really have to be
> > >>> tight
> > >>> > > (i.e.
> > >>> > > > > > > > probably no
> > >>> > > > > > > > >> >> need
> > >>> > > > > > > > >> >> > > to
> > >>> > > > > > > > >> >> > > > go search the log itself, though you
> could).
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > > > -Jay
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > > > On Tue, Oct 13, 2015 at 12:32 AM, Jay
> Kreps <
> > >>> > > > > > > j...@confluent.io>
> > >>> > > > > > > > >> >> wrote:
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > > > > Here's my basic take:
> > >>> > > > > > > > >> >> > > > > - I agree it would be nice to have a
> notion
> > >>> of
> > >>> > time
> > >>> > > > > baked
> > >>> > > > > > > in
> > >>> > > > > > > > if
> > >>> > > > > > > > >> it
> > >>> > > > > > > > >> >> > were
> > >>> > > > > > > > >> >> > > > > done right
> > >>> > > > > > > > >> >> > > > > - All the proposals so far seem pretty
> > >>> complex--I
> > >>> > > > think
> > >>> > > > > > > they
> > >>> > > > > > > > >> might
> > >>> > > > > > > > >> >> > make
> > >>> > > > > > > > >> >> > > > > things worse rather than better overall
> > >>> > > > > > > > >> >> > > > > - I think adding 2x8 byte timestamps to
> the
> > >>> > message
> > >>> > > > is
> > >>> > > > > > > > probably
> > >>> > > > > > > > >> a
> > >>> > > > > > > > >> >> > > > > non-starter from a size perspective
> > >>> > > > > > > > >> >> > > > > - Even if it isn't in the message, having
> > two
> > >>> > > notions
> > >>> > > > > of
> > >>> > > > > > > time
> > >>> > > > > > > > >> that
> > >>> > > > > > > > >> >> > > > control
> > >>> > > > > > > > >> >> > > > > different things is a bit confusing
> > >>> > > > > > > > >> >> > > > > - The mechanics of basing retention etc
> on
> > >>> log
> > >>> > > append
> > >>> > > > > > time
> > >>> > > > > > > > when
> > >>> > > > > > > > >> >> > that's
> > >>> > > > > > > > >> >> > > > not
> > >>> > > > > > > > >> >> > > > > in the log seem complicated
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > To that end here is a possible 4th
> option.
> > >>> Let me
> > >>> > > > know
> > >>> > > > > > what
> > >>> > > > > > > > you
> > >>> > > > > > > > >> >> > think.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > The basic idea is that the message
> creation
> > >>> time
> > >>> > is
> > >>> > > > > > closest
> > >>> > > > > > > > to
> > >>> > > > > > > > >> >> what
> > >>> > > > > > > > >> >> > the
> > >>> > > > > > > > >> >> > > > > user actually cares about but is
> dangerous
> > >>> if set
> > >>> > > > > wrong.
> > >>> > > > > > So
> > >>> > > > > > > > >> rather
> > >>> > > > > > > > >> >> > than
> > >>> > > > > > > > >> >> > > > > substitute another notion of time, let's
> > try
> > >>> to
> > >>> > > > ensure
> > >>> > > > > > the
> > >>> > > > > > > > >> >> > correctness
> > >>> > > > > > > > >> >> > > of
> > >>> > > > > > > > >> >> > > > > message creation time by preventing
> > >>> arbitrarily
> > >>> > bad
> > >>> > > > > > message
> > >>> > > > > > > > >> >> creation
> > >>> > > > > > > > >> >> > > > times.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > First, let's see if we can agree that log
> > >>> append
> > >>> > > time
> > >>> > > > > is
> > >>> > > > > > > not
> > >>> > > > > > > > >> >> > something
> > >>> > > > > > > > >> >> > > > > anyone really cares about but rather an
> > >>> > > > implementation
> > >>> > > > > > > > detail.
> > >>> > > > > > > > >> The
> > >>> > > > > > > > >> >> > > > > timestamp that matters to the user is
> when
> > >>> the
> > >>> > > > message
> > >>> > > > > > > > occurred
> > >>> > > > > > > > >> >> (the
> > >>> > > > > > > > >> >> > > > > creation time). The log append time is
> > >>> basically
> > >>> > > just
> > >>> > > > > an
> > >>> > > > > > > > >> >> > approximation
> > >>> > > > > > > > >> >> > > to
> > >>> > > > > > > > >> >> > > > > this on the assumption that the message
> > >>> creation
> > >>> > > and
> > >>> > > > > the
> > >>> > > > > > > > message
> > >>> > > > > > > > >> >> > > receive
> > >>> > > > > > > > >> >> > > > on
> > >>> > > > > > > > >> >> > > > > the server occur pretty close together
> and
> > >>> the
> > >>> > > reason
> > >>> > > > > to
> > >>> > > > > > > > prefer
> > >>> > > > > > > > >> .
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > But as these values diverge the issue
> > starts
> > >>> to
> > >>> > > > become
> > >>> > > > > > > > apparent.
> > >>> > > > > > > > >> >> Say
> > >>> > > > > > > > >> >> > > you
> > >>> > > > > > > > >> >> > > > > set the retention to one week and then
> > mirror
> > >>> > data
> > >>> > > > > from a
> > >>> > > > > > > > topic
> > >>> > > > > > > > >> >> > > > containing
> > >>> > > > > > > > >> >> > > > > two years of retention. Your intention is
> > >>> clearly
> > >>> > > to
> > >>> > > > > keep
> > >>> > > > > > > the
> > >>> > > > > > > > >> last
> > >>> > > > > > > > >> >> > > week,
> > >>> > > > > > > > >> >> > > > > but because the mirroring is appending
> > right
> > >>> now
> > >>> > > you
> > >>> > > > > will
> > >>> > > > > > > > keep
> > >>> > > > > > > > >> two
> > >>> > > > > > > > >> >> > > years.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > The reason we are liking log append time
> is
> > >>> > because
> > >>> > > > we
> > >>> > > > > > are
> > >>> > > > > > > > >> >> > > (justifiably)
> > >>> > > > > > > > >> >> > > > > concerned that in certain situations the
> > >>> creation
> > >>> > > > time
> > >>> > > > > > may
> > >>> > > > > > > > not
> > >>> > > > > > > > >> be
> > >>> > > > > > > > >> >> > > > > trustworthy. This same problem exists on
> > the
> > >>> > > servers
> > >>> > > > > but
> > >>> > > > > > > > there
> > >>> > > > > > > > >> are
> > >>> > > > > > > > >> >> > > fewer
> > >>> > > > > > > > >> >> > > > > servers and they just run the kafka code
> so
> > >>> it is
> > >>> > > > less
> > >>> > > > > of
> > >>> > > > > > > an
> > >>> > > > > > > > >> >> issue.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > There are two possible ways to handle
> this:
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > >    1. Just tell people to add size based
> > >>> > > retention. I
> > >>> > > > > > think
> > >>> > > > > > > > this
> > >>> > > > > > > > >> >> is
> > >>> > > > > > > > >> >> > not
> > >>> > > > > > > > >> >> > > > >    entirely unreasonable, we're basically
> > >>> saying
> > >>> > we
> > >>> > > > > > retain
> > >>> > > > > > > > data
> > >>> > > > > > > > >> >> based
> > >>> > > > > > > > >> >> > > on
> > >>> > > > > > > > >> >> > > > the
> > >>> > > > > > > > >> >> > > > >    timestamp you give us in the data. If
> > you
> > >>> give
> > >>> > > us
> > >>> > > > > bad
> > >>> > > > > > > > data we
> > >>> > > > > > > > >> >> will
> > >>> > > > > > > > >> >> > > > retain
> > >>> > > > > > > > >> >> > > > >    it for a bad amount of time. If you
> want
> > >>> to
> > >>> > > ensure
> > >>> > > > > we
> > >>> > > > > > > > don't
> > >>> > > > > > > > >> >> retain
> > >>> > > > > > > > >> >> > > > "too
> > >>> > > > > > > > >> >> > > > >    much" data, define "too much" by
> > setting a
> > >>> > > > > time-based
> > >>> > > > > > > > >> retention
> > >>> > > > > > > > >> >> > > > setting.
> > >>> > > > > > > > >> >> > > > >    This is not entirely unreasonable but
> > >>> kind of
> > >>> > > > > suffers
> > >>> > > > > > > > from a
> > >>> > > > > > > > >> >> "one
> > >>> > > > > > > > >> >> > > bad
> > >>> > > > > > > > >> >> > > > >    apple" problem in a very large
> > >>> environment.
> > >>> > > > > > > > >> >> > > > >    2. Prevent bad timestamps. In general
> we
> > >>> can't
> > >>> > > > say a
> > >>> > > > > > > > >> timestamp
> > >>> > > > > > > > >> >> is
> > >>> > > > > > > > >> >> > > bad.
> > >>> > > > > > > > >> >> > > > >    However the definition we're
> implicitly
> > >>> using
> > >>> > is
> > >>> > > > > that
> > >>> > > > > > we
> > >>> > > > > > > > >> think
> > >>> > > > > > > > >> >> > there
> > >>> > > > > > > > >> >> > > > are a
> > >>> > > > > > > > >> >> > > > >    set of topics/clusters where the
> > creation
> > >>> > > > timestamp
> > >>> > > > > > > should
> > >>> > > > > > > > >> >> always
> > >>> > > > > > > > >> >> > be
> > >>> > > > > > > > >> >> > > > "very
> > >>> > > > > > > > >> >> > > > >    close" to the log append timestamp.
> This
> > >>> is
> > >>> > true
> > >>> > > > for
> > >>> > > > > > > data
> > >>> > > > > > > > >> >> sources
> > >>> > > > > > > > >> >> > > > that have
> > >>> > > > > > > > >> >> > > > >    no buffering capability (which at
> > >>> LinkedIn is
> > >>> > > very
> > >>> > > > > > > common,
> > >>> > > > > > > > >> but
> > >>> > > > > > > > >> >> is
> > >>> > > > > > > > >> >> > > > more rare
> > >>> > > > > > > > >> >> > > > >    elsewhere). The solution in this case
> > >>> would be
> > >>> > > to
> > >>> > > > > > allow
> > >>> > > > > > > a
> > >>> > > > > > > > >> >> setting
> > >>> > > > > > > > >> >> > > > along the
> > >>> > > > > > > > >> >> > > > >    lines of max.append.delay which checks
> > the
> > >>> > > > creation
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> >> > > against
> > >>> > > > > > > > >> >> > > > the
> > >>> > > > > > > > >> >> > > > >    server time to look for too large a
> > >>> > divergence.
> > >>> > > > The
> > >>> > > > > > > > solution
> > >>> > > > > > > > >> >> would
> > >>> > > > > > > > >> >> > > > either
> > >>> > > > > > > > >> >> > > > >    be to reject the message or to
> override
> > it
> > >>> > with
> > >>> > > > the
> > >>> > > > > > > server
> > >>> > > > > > > > >> >> time.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > So in LI's environment you would
> configure
> > >>> the
> > >>> > > > clusters
> > >>> > > > > > > used
> > >>> > > > > > > > for
> > >>> > > > > > > > >> >> > > direct,
> > >>> > > > > > > > >> >> > > > > unbuffered, message production (e.g.
> > >>> tracking and
> > >>> > > > > metrics
> > >>> > > > > > > > local)
> > >>> > > > > > > > >> >> to
> > >>> > > > > > > > >> >> > > > enforce
> > >>> > > > > > > > >> >> > > > > a reasonably aggressive timestamp bound
> > (say
> > >>> 10
> > >>> > > > mins),
> > >>> > > > > > and
> > >>> > > > > > > > all
> > >>> > > > > > > > >> >> other
> > >>> > > > > > > > >> >> > > > > clusters would just inherent these.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > The downside of this approach is
> requiring
> > >>> the
> > >>> > > > special
> > >>> > > > > > > > >> >> configuration.
> > >>> > > > > > > > >> >> > > > > However I think in 99% of environments
> this
> > >>> could
> > >>> > > be
> > >>> > > > > > > skipped
> > >>> > > > > > > > >> >> > entirely,
> > >>> > > > > > > > >> >> > > > it's
> > >>> > > > > > > > >> >> > > > > only when the ratio of clients to servers
> > >>> gets so
> > >>> > > > > massive
> > >>> > > > > > > > that
> > >>> > > > > > > > >> you
> > >>> > > > > > > > >> >> > need
> > >>> > > > > > > > >> >> > > > to
> > >>> > > > > > > > >> >> > > > > do this. The primary upside is that you
> > have
> > >>> a
> > >>> > > single
> > >>> > > > > > > > >> >> authoritative
> > >>> > > > > > > > >> >> > > > notion
> > >>> > > > > > > > >> >> > > > > of time which is closest to what a user
> > would
> > >>> > want
> > >>> > > > and
> > >>> > > > > is
> > >>> > > > > > > > stored
> > >>> > > > > > > > >> >> > > directly
> > >>> > > > > > > > >> >> > > > > in the message.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > I'm also assuming there is a workable
> > >>> approach
> > >>> > for
> > >>> > > > > > indexing
> > >>> > > > > > > > >> >> > > non-monotonic
> > >>> > > > > > > > >> >> > > > > timestamps, though I haven't actually
> > worked
> > >>> that
> > >>> > > > out.
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > -Jay
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > > On Mon, Oct 5, 2015 at 8:52 PM, Jiangjie
> > Qin
> > >>> > > > > > > > >> >> > <j...@linkedin.com.invalid
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > > > > wrote:
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > >> Bumping up this thread although most of
> > the
> > >>> > > > discussion
> > >>> > > > > > > were
> > >>> > > > > > > > on
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > > > >> discussion thread of KIP-31 :)
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> I just updated the KIP page to add
> > detailed
> > >>> > > solution
> > >>> > > > > for
> > >>> > > > > > > the
> > >>> > > > > > > > >> >> option
> > >>> > > > > > > > >> >> > > > >> (Option
> > >>> > > > > > > > >> >> > > > >> 3) that does not expose the
> LogAppendTime
> > to
> > >>> > user.
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >>
> > >>> > > > > > > >
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+CreateTime+and+LogAppendTime+to+Kafka+message
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> The option has a minor change to the
> fetch
> > >>> > request
> > >>> > > > to
> > >>> > > > > > > allow
> > >>> > > > > > > > >> >> fetching
> > >>> > > > > > > > >> >> > > > time
> > >>> > > > > > > > >> >> > > > >> index entry as well. I kind of like this
> > >>> > solution
> > >>> > > > > > because
> > >>> > > > > > > > its
> > >>> > > > > > > > >> >> just
> > >>> > > > > > > > >> >> > > doing
> > >>> > > > > > > > >> >> > > > >> what we need without introducing other
> > >>> things.
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> It will be great to see what are the
> > >>> feedback. I
> > >>> > > can
> > >>> > > > > > > explain
> > >>> > > > > > > > >> more
> > >>> > > > > > > > >> >> > > during
> > >>> > > > > > > > >> >> > > > >> tomorrow's KIP hangout.
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> Thanks,
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> Jiangjie (Becket) Qin
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> On Thu, Sep 10, 2015 at 2:47 PM,
> Jiangjie
> > >>> Qin <
> > >>> > > > > > > > >> j...@linkedin.com
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >> > > > wrote:
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >> > Hi Jay,
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> > I just copy/pastes here your feedback
> on
> > >>> the
> > >>> > > > > timestamp
> > >>> > > > > > > > >> proposal
> > >>> > > > > > > > >> >> > that
> > >>> > > > > > > > >> >> > > > was
> > >>> > > > > > > > >> >> > > > >> > in the discussion thread of KIP-31.
> > >>> Please see
> > >>> > > the
> > >>> > > > > > > replies
> > >>> > > > > > > > >> >> inline.
> > >>> > > > > > > > >> >> > > > >> > The main change I made compared with
> > >>> previous
> > >>> > > > > proposal
> > >>> > > > > > > is
> > >>> > > > > > > > to
> > >>> > > > > > > > >> >> add
> > >>> > > > > > > > >> >> > > both
> > >>> > > > > > > > >> >> > > > >> > CreateTime and LogAppendTime to the
> > >>> message.
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> > On Tue, Sep 8, 2015 at 10:57 AM, Jay
> > >>> Kreps <
> > >>> > > > > > > > j...@confluent.io
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >> > > wrote:
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> > > Hey Beckett,
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > I was proposing splitting up the KIP
> > >>> just
> > >>> > for
> > >>> > > > > > > > simplicity of
> > >>> > > > > > > > >> >> > > > >> discussion.
> > >>> > > > > > > > >> >> > > > >> > You
> > >>> > > > > > > > >> >> > > > >> > > can still implement them in one
> > patch. I
> > >>> > think
> > >>> > > > > > > > otherwise it
> > >>> > > > > > > > >> >> will
> > >>> > > > > > > > >> >> > > be
> > >>> > > > > > > > >> >> > > > >> hard
> > >>> > > > > > > > >> >> > > > >> > to
> > >>> > > > > > > > >> >> > > > >> > > discuss/vote on them since if you
> like
> > >>> the
> > >>> > > > offset
> > >>> > > > > > > > proposal
> > >>> > > > > > > > >> >> but
> > >>> > > > > > > > >> >> > not
> > >>> > > > > > > > >> >> > > > the
> > >>> > > > > > > > >> >> > > > >> > time
> > >>> > > > > > > > >> >> > > > >> > > proposal what do you do?
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > Introducing a second notion of time
> > into
> > >>> > Kafka
> > >>> > > > is
> > >>> > > > > a
> > >>> > > > > > > > pretty
> > >>> > > > > > > > >> >> > massive
> > >>> > > > > > > > >> >> > > > >> > > philosophical change so it kind of
> > >>> warrants
> > >>> > > it's
> > >>> > > > > own
> > >>> > > > > > > > KIP I
> > >>> > > > > > > > >> >> think
> > >>> > > > > > > > >> >> > > it
> > >>> > > > > > > > >> >> > > > >> > isn't
> > >>> > > > > > > > >> >> > > > >> > > just "Change message format".
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > WRT time I think one thing to
> clarify
> > >>> in the
> > >>> > > > > > proposal
> > >>> > > > > > > is
> > >>> > > > > > > > >> how
> > >>> > > > > > > > >> >> MM
> > >>> > > > > > > > >> >> > > will
> > >>> > > > > > > > >> >> > > > >> have
> > >>> > > > > > > > >> >> > > > >> > > access to set the timestamp?
> > Presumably
> > >>> this
> > >>> > > > will
> > >>> > > > > > be a
> > >>> > > > > > > > new
> > >>> > > > > > > > >> >> field
> > >>> > > > > > > > >> >> > > in
> > >>> > > > > > > > >> >> > > > >> > > ProducerRecord, right? If so then
> any
> > >>> user
> > >>> > can
> > >>> > > > set
> > >>> > > > > > the
> > >>> > > > > > > > >> >> > timestamp,
> > >>> > > > > > > > >> >> > > > >> right?
> > >>> > > > > > > > >> >> > > > >> > > I'm not sure you answered the
> > questions
> > >>> > around
> > >>> > > > how
> > >>> > > > > > > this
> > >>> > > > > > > > >> will
> > >>> > > > > > > > >> >> > work
> > >>> > > > > > > > >> >> > > > for
> > >>> > > > > > > > >> >> > > > >> MM
> > >>> > > > > > > > >> >> > > > >> > > since when MM retains timestamps
> from
> > >>> > multiple
> > >>> > > > > > > > partitions
> > >>> > > > > > > > >> >> they
> > >>> > > > > > > > >> >> > > will
> > >>> > > > > > > > >> >> > > > >> then
> > >>> > > > > > > > >> >> > > > >> > be
> > >>> > > > > > > > >> >> > > > >> > > out of order and in the past (so the
> > >>> > > > > > > > >> >> max(lastAppendedTimestamp,
> > >>> > > > > > > > >> >> > > > >> > > currentTimeMillis) override you
> > proposed
> > >>> > will
> > >>> > > > not
> > >>> > > > > > > work,
> > >>> > > > > > > > >> >> right?).
> > >>> > > > > > > > >> >> > > If
> > >>> > > > > > > > >> >> > > > we
> > >>> > > > > > > > >> >> > > > >> > > don't do this then when you set up
> > >>> mirroring
> > >>> > > the
> > >>> > > > > > data
> > >>> > > > > > > > will
> > >>> > > > > > > > >> >> all
> > >>> > > > > > > > >> >> > be
> > >>> > > > > > > > >> >> > > > new
> > >>> > > > > > > > >> >> > > > >> and
> > >>> > > > > > > > >> >> > > > >> > > you have the same retention problem
> > you
> > >>> > > > described.
> > >>> > > > > > > > Maybe I
> > >>> > > > > > > > >> >> > missed
> > >>> > > > > > > > >> >> > > > >> > > something...?
> > >>> > > > > > > > >> >> > > > >> > lastAppendedTimestamp means the
> > timestamp
> > >>> of
> > >>> > the
> > >>> > > > > > message
> > >>> > > > > > > > that
> > >>> > > > > > > > >> >> last
> > >>> > > > > > > > >> >> > > > >> > appended to the log.
> > >>> > > > > > > > >> >> > > > >> > If a broker is a leader, since it will
> > >>> assign
> > >>> > > the
> > >>> > > > > > > > timestamp
> > >>> > > > > > > > >> by
> > >>> > > > > > > > >> >> > > itself,
> > >>> > > > > > > > >> >> > > > >> the
> > >>> > > > > > > > >> >> > > > >> > lastAppenedTimestamp will be its local
> > >>> clock
> > >>> > > when
> > >>> > > > > > append
> > >>> > > > > > > > the
> > >>> > > > > > > > >> >> last
> > >>> > > > > > > > >> >> > > > >> message.
> > >>> > > > > > > > >> >> > > > >> > So if there is no leader migration,
> > >>> > > > > > > > >> max(lastAppendedTimestamp,
> > >>> > > > > > > > >> >> > > > >> > currentTimeMillis) =
> currentTimeMillis.
> > >>> > > > > > > > >> >> > > > >> > If a broker is a follower, because it
> > will
> > >>> > keep
> > >>> > > > the
> > >>> > > > > > > > leader's
> > >>> > > > > > > > >> >> > > timestamp
> > >>> > > > > > > > >> >> > > > >> > unchanged, the lastAppendedTime would
> be
> > >>> the
> > >>> > > > > leader's
> > >>> > > > > > > > clock
> > >>> > > > > > > > >> >> when
> > >>> > > > > > > > >> >> > it
> > >>> > > > > > > > >> >> > > > >> appends
> > >>> > > > > > > > >> >> > > > >> > that message message. It keeps track
> of
> > >>> the
> > >>> > > > > > > > lastAppendedTime
> > >>> > > > > > > > >> >> only
> > >>> > > > > > > > >> >> > in
> > >>> > > > > > > > >> >> > > > >> case
> > >>> > > > > > > > >> >> > > > >> > it becomes leader later on. At that
> > >>> point, it
> > >>> > is
> > >>> > > > > > > possible
> > >>> > > > > > > > >> that
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > > > >> > timestamp of the last appended message
> > was
> > >>> > > stamped
> > >>> > > > > by
> > >>> > > > > > > old
> > >>> > > > > > > > >> >> leader,
> > >>> > > > > > > > >> >> > > but
> > >>> > > > > > > > >> >> > > > >> the
> > >>> > > > > > > > >> >> > > > >> > new leader's currentTimeMillis <
> > >>> > > lastAppendedTime.
> > >>> > > > > If
> > >>> > > > > > a
> > >>> > > > > > > > new
> > >>> > > > > > > > >> >> > message
> > >>> > > > > > > > >> >> > > > >> comes,
> > >>> > > > > > > > >> >> > > > >> > instead of stamp it with new leader's
> > >>> > > > > > currentTimeMillis,
> > >>> > > > > > > > we
> > >>> > > > > > > > >> >> have
> > >>> > > > > > > > >> >> > to
> > >>> > > > > > > > >> >> > > > >> stamp
> > >>> > > > > > > > >> >> > > > >> > it to lastAppendedTime to avoid the
> > >>> timestamp
> > >>> > in
> > >>> > > > the
> > >>> > > > > > log
> > >>> > > > > > > > >> going
> > >>> > > > > > > > >> >> > > > backward.
> > >>> > > > > > > > >> >> > > > >> > The max(lastAppendedTimestamp,
> > >>> > > currentTimeMillis)
> > >>> > > > is
> > >>> > > > > > > > purely
> > >>> > > > > > > > >> >> based
> > >>> > > > > > > > >> >> > on
> > >>> > > > > > > > >> >> > > > the
> > >>> > > > > > > > >> >> > > > >> > broker side clock. If MM produces
> > message
> > >>> with
> > >>> > > > > > different
> > >>> > > > > > > > >> >> > > LogAppendTime
> > >>> > > > > > > > >> >> > > > >> in
> > >>> > > > > > > > >> >> > > > >> > source clusters to the same target
> > >>> cluster,
> > >>> > the
> > >>> > > > > > > > LogAppendTime
> > >>> > > > > > > > >> >> will
> > >>> > > > > > > > >> >> > > be
> > >>> > > > > > > > >> >> > > > >> > ignored  re-stamped by target cluster.
> > >>> > > > > > > > >> >> > > > >> > I added a use case example for mirror
> > >>> maker in
> > >>> > > > > KIP-32.
> > >>> > > > > > > > Also
> > >>> > > > > > > > >> >> there
> > >>> > > > > > > > >> >> > > is a
> > >>> > > > > > > > >> >> > > > >> > corner case discussion about when we
> > need
> > >>> the
> > >>> > > > > > > > >> >> > max(lastAppendedTime,
> > >>> > > > > > > > >> >> > > > >> > currentTimeMillis) trick. Could you
> > take a
> > >>> > look
> > >>> > > > and
> > >>> > > > > > see
> > >>> > > > > > > if
> > >>> > > > > > > > >> that
> > >>> > > > > > > > >> >> > > > answers
> > >>> > > > > > > > >> >> > > > >> > your question?
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > My main motivation is that given
> that
> > >>> both
> > >>> > > Samza
> > >>> > > > > and
> > >>> > > > > > > > Kafka
> > >>> > > > > > > > >> >> > streams
> > >>> > > > > > > > >> >> > > > are
> > >>> > > > > > > > >> >> > > > >> > > doing work that implies a mandatory
> > >>> > > > client-defined
> > >>> > > > > > > > notion
> > >>> > > > > > > > >> of
> > >>> > > > > > > > >> >> > > time, I
> > >>> > > > > > > > >> >> > > > >> > really
> > >>> > > > > > > > >> >> > > > >> > > think introducing a different
> > mandatory
> > >>> > notion
> > >>> > > > of
> > >>> > > > > > time
> > >>> > > > > > > > in
> > >>> > > > > > > > >> >> Kafka
> > >>> > > > > > > > >> >> > is
> > >>> > > > > > > > >> >> > > > >> going
> > >>> > > > > > > > >> >> > > > >> > to
> > >>> > > > > > > > >> >> > > > >> > > be quite odd. We should think hard
> > >>> about how
> > >>> > > > > > > > client-defined
> > >>> > > > > > > > >> >> time
> > >>> > > > > > > > >> >> > > > could
> > >>> > > > > > > > >> >> > > > >> > > work. I'm not sure if it can, but
> I'm
> > >>> also
> > >>> > not
> > >>> > > > > sure
> > >>> > > > > > > > that it
> > >>> > > > > > > > >> >> > can't.
> > >>> > > > > > > > >> >> > > > >> Having
> > >>> > > > > > > > >> >> > > > >> > > both will be odd. Did you chat about
> > >>> this
> > >>> > with
> > >>> > > > > > > > Yi/Kartik on
> > >>> > > > > > > > >> >> the
> > >>> > > > > > > > >> >> > > > Samza
> > >>> > > > > > > > >> >> > > > >> > side?
> > >>> > > > > > > > >> >> > > > >> > I talked with Kartik and realized that
> > it
> > >>> > would
> > >>> > > be
> > >>> > > > > > > useful
> > >>> > > > > > > > to
> > >>> > > > > > > > >> >> have
> > >>> > > > > > > > >> >> > a
> > >>> > > > > > > > >> >> > > > >> client
> > >>> > > > > > > > >> >> > > > >> > timestamp to facilitate use cases like
> > >>> stream
> > >>> > > > > > > processing.
> > >>> > > > > > > > >> >> > > > >> > I was trying to figure out if we can
> > >>> simply
> > >>> > use
> > >>> > > > > client
> > >>> > > > > > > > >> >> timestamp
> > >>> > > > > > > > >> >> > > > without
> > >>> > > > > > > > >> >> > > > >> > introducing the server time. There are
> > >>> some
> > >>> > > > > discussion
> > >>> > > > > > > in
> > >>> > > > > > > > the
> > >>> > > > > > > > >> >> KIP.
> > >>> > > > > > > > >> >> > > > >> > The key problem we want to solve here
> is
> > >>> > > > > > > > >> >> > > > >> > 1. We want log retention and rolling
> to
> > >>> depend
> > >>> > > on
> > >>> > > > > > server
> > >>> > > > > > > > >> clock.
> > >>> > > > > > > > >> >> > > > >> > 2. We want to make sure the
> > log-assiciated
> > >>> > > > timestamp
> > >>> > > > > > to
> > >>> > > > > > > be
> > >>> > > > > > > > >> >> > retained
> > >>> > > > > > > > >> >> > > > when
> > >>> > > > > > > > >> >> > > > >> > replicas moves.
> > >>> > > > > > > > >> >> > > > >> > 3. We want to use the timestamp in
> some
> > >>> way
> > >>> > that
> > >>> > > > can
> > >>> > > > > > > allow
> > >>> > > > > > > > >> >> > searching
> > >>> > > > > > > > >> >> > > > by
> > >>> > > > > > > > >> >> > > > >> > timestamp.
> > >>> > > > > > > > >> >> > > > >> > For 1 and 2, an alternative is to pass
> > the
> > >>> > > > > > > log-associated
> > >>> > > > > > > > >> >> > timestamp
> > >>> > > > > > > > >> >> > > > >> > through replication, that means we
> need
> > to
> > >>> > have
> > >>> > > a
> > >>> > > > > > > > different
> > >>> > > > > > > > >> >> > protocol
> > >>> > > > > > > > >> >> > > > for
> > >>> > > > > > > > >> >> > > > >> > replica fetching to pass
> log-associated
> > >>> > > timestamp.
> > >>> > > > > It
> > >>> > > > > > is
> > >>> > > > > > > > >> >> actually
> > >>> > > > > > > > >> >> > > > >> > complicated and there could be a lot
> of
> > >>> corner
> > >>> > > > cases
> > >>> > > > > > to
> > >>> > > > > > > > >> handle.
> > >>> > > > > > > > >> >> > e.g.
> > >>> > > > > > > > >> >> > > > >> what
> > >>> > > > > > > > >> >> > > > >> > if an old leader started to fetch from
> > >>> the new
> > >>> > > > > leader,
> > >>> > > > > > > > should
> > >>> > > > > > > > >> >> it
> > >>> > > > > > > > >> >> > > also
> > >>> > > > > > > > >> >> > > > >> > update all of its old log segment
> > >>> timestamp?
> > >>> > > > > > > > >> >> > > > >> > I think actually client side timestamp
> > >>> would
> > >>> > be
> > >>> > > > > better
> > >>> > > > > > > > for 3
> > >>> > > > > > > > >> >> if we
> > >>> > > > > > > > >> >> > > can
> > >>> > > > > > > > >> >> > > > >> > find a way to make it work.
> > >>> > > > > > > > >> >> > > > >> > So far I am not able to convince
> myself
> > >>> that
> > >>> > > only
> > >>> > > > > > having
> > >>> > > > > > > > >> client
> > >>> > > > > > > > >> >> > side
> > >>> > > > > > > > >> >> > > > >> > timestamp would work mainly because 1
> > and
> > >>> 2.
> > >>> > > There
> > >>> > > > > > are a
> > >>> > > > > > > > few
> > >>> > > > > > > > >> >> > > > situations
> > >>> > > > > > > > >> >> > > > >> I
> > >>> > > > > > > > >> >> > > > >> > mentioned in the KIP.
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > When you are saying it won't work
> you
> > >>> are
> > >>> > > > assuming
> > >>> > > > > > > some
> > >>> > > > > > > > >> >> > particular
> > >>> > > > > > > > >> >> > > > >> > > implementation? Maybe that the index
> > is
> > >>> a
> > >>> > > > > > > monotonically
> > >>> > > > > > > > >> >> > increasing
> > >>> > > > > > > > >> >> > > > >> set of
> > >>> > > > > > > > >> >> > > > >> > > pointers to the least record with a
> > >>> > timestamp
> > >>> > > > > larger
> > >>> > > > > > > > than
> > >>> > > > > > > > >> the
> > >>> > > > > > > > >> >> > > index
> > >>> > > > > > > > >> >> > > > >> time?
> > >>> > > > > > > > >> >> > > > >> > > In other words a search for time X
> > >>> gives the
> > >>> > > > > largest
> > >>> > > > > > > > offset
> > >>> > > > > > > > >> >> at
> > >>> > > > > > > > >> >> > > which
> > >>> > > > > > > > >> >> > > > >> all
> > >>> > > > > > > > >> >> > > > >> > > records are <= X?
> > >>> > > > > > > > >> >> > > > >> > It is a promising idea. We probably
> can
> > >>> have
> > >>> > an
> > >>> > > > > > > in-memory
> > >>> > > > > > > > >> index
> > >>> > > > > > > > >> >> > like
> > >>> > > > > > > > >> >> > > > >> that,
> > >>> > > > > > > > >> >> > > > >> > but might be complicated to have a
> file
> > on
> > >>> > disk
> > >>> > > > like
> > >>> > > > > > > that.
> > >>> > > > > > > > >> >> Imagine
> > >>> > > > > > > > >> >> > > > there
> > >>> > > > > > > > >> >> > > > >> > are two timestamps T0 < T1. We see
> > >>> message Y
> > >>> > > > created
> > >>> > > > > > at
> > >>> > > > > > > T1
> > >>> > > > > > > > >> and
> > >>> > > > > > > > >> >> > > created
> > >>> > > > > > > > >> >> > > > >> > index like [T1->Y], then we see
> message
> > >>> > created
> > >>> > > at
> > >>> > > > > T1,
> > >>> > > > > > > > >> >> supposedly
> > >>> > > > > > > > >> >> > we
> > >>> > > > > > > > >> >> > > > >> should
> > >>> > > > > > > > >> >> > > > >> > have index look like [T0->X, T1->Y],
> it
> > is
> > >>> > easy
> > >>> > > to
> > >>> > > > > do
> > >>> > > > > > in
> > >>> > > > > > > > >> >> memory,
> > >>> > > > > > > > >> >> > but
> > >>> > > > > > > > >> >> > > > we
> > >>> > > > > > > > >> >> > > > >> > might have to rewrite the index file
> > >>> > completely.
> > >>> > > > > Maybe
> > >>> > > > > > > we
> > >>> > > > > > > > can
> > >>> > > > > > > > >> >> have
> > >>> > > > > > > > >> >> > > the
> > >>> > > > > > > > >> >> > > > >> > first entry with timestamp to 0, and
> > only
> > >>> > update
> > >>> > > > the
> > >>> > > > > > > first
> > >>> > > > > > > > >> >> pointer
> > >>> > > > > > > > >> >> > > for
> > >>> > > > > > > > >> >> > > > >> any
> > >>> > > > > > > > >> >> > > > >> > out of range timestamp, so the index
> > will
> > >>> be
> > >>> > > > [0->X,
> > >>> > > > > > > > T1->Y].
> > >>> > > > > > > > >> >> Also,
> > >>> > > > > > > > >> >> > > the
> > >>> > > > > > > > >> >> > > > >> range
> > >>> > > > > > > > >> >> > > > >> > of timestamps in the log segments can
> > >>> overlap
> > >>> > > with
> > >>> > > > > > each
> > >>> > > > > > > > >> other.
> > >>> > > > > > > > >> >> > That
> > >>> > > > > > > > >> >> > > > >> means
> > >>> > > > > > > > >> >> > > > >> > we either need to keep a cross
> segments
> > >>> index
> > >>> > > file
> > >>> > > > > or
> > >>> > > > > > we
> > >>> > > > > > > > need
> > >>> > > > > > > > >> >> to
> > >>> > > > > > > > >> >> > > check
> > >>> > > > > > > > >> >> > > > >> all
> > >>> > > > > > > > >> >> > > > >> > the index file for each log segment.
> > >>> > > > > > > > >> >> > > > >> > I separated out the time based log
> index
> > >>> to
> > >>> > > KIP-33
> > >>> > > > > > > > because it
> > >>> > > > > > > > >> >> can
> > >>> > > > > > > > >> >> > be
> > >>> > > > > > > > >> >> > > > an
> > >>> > > > > > > > >> >> > > > >> > independent follow up feature as Neha
> > >>> > > suggested. I
> > >>> > > > > > will
> > >>> > > > > > > > try
> > >>> > > > > > > > >> to
> > >>> > > > > > > > >> >> > make
> > >>> > > > > > > > >> >> > > > the
> > >>> > > > > > > > >> >> > > > >> > time based index work with client side
> > >>> > > timestamp.
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > For retention, I agree with the
> > problem
> > >>> you
> > >>> > > > point
> > >>> > > > > > out,
> > >>> > > > > > > > but
> > >>> > > > > > > > >> I
> > >>> > > > > > > > >> >> > think
> > >>> > > > > > > > >> >> > > > >> what
> > >>> > > > > > > > >> >> > > > >> > you
> > >>> > > > > > > > >> >> > > > >> > > are saying in that case is that you
> > >>> want a
> > >>> > > size
> > >>> > > > > > limit
> > >>> > > > > > > > too.
> > >>> > > > > > > > >> If
> > >>> > > > > > > > >> >> > you
> > >>> > > > > > > > >> >> > > > use
> > >>> > > > > > > > >> >> > > > >> > > system time you actually hit the
> same
> > >>> > problem:
> > >>> > > > say
> > >>> > > > > > you
> > >>> > > > > > > > do a
> > >>> > > > > > > > >> >> full
> > >>> > > > > > > > >> >> > > > dump
> > >>> > > > > > > > >> >> > > > >> of
> > >>> > > > > > > > >> >> > > > >> > a
> > >>> > > > > > > > >> >> > > > >> > > DB table with a setting of 7 days
> > >>> retention,
> > >>> > > > your
> > >>> > > > > > > > retention
> > >>> > > > > > > > >> >> will
> > >>> > > > > > > > >> >> > > > >> actually
> > >>> > > > > > > > >> >> > > > >> > > not get enforced for the first 7
> days
> > >>> > because
> > >>> > > > the
> > >>> > > > > > data
> > >>> > > > > > > > is
> > >>> > > > > > > > >> >> "new
> > >>> > > > > > > > >> >> > to
> > >>> > > > > > > > >> >> > > > >> Kafka".
> > >>> > > > > > > > >> >> > > > >> > I kind of think the size limit here is
> > >>> > > orthogonal.
> > >>> > > > > It
> > >>> > > > > > > is a
> > >>> > > > > > > > >> >> valid
> > >>> > > > > > > > >> >> > use
> > >>> > > > > > > > >> >> > > > >> case
> > >>> > > > > > > > >> >> > > > >> > where people only want to use time
> based
> > >>> > > retention
> > >>> > > > > > only.
> > >>> > > > > > > > In
> > >>> > > > > > > > >> >> your
> > >>> > > > > > > > >> >> > > > >> example,
> > >>> > > > > > > > >> >> > > > >> > depending on client timestamp might
> > break
> > >>> the
> > >>> > > > > > > > functionality -
> > >>> > > > > > > > >> >> say
> > >>> > > > > > > > >> >> > it
> > >>> > > > > > > > >> >> > > > is
> > >>> > > > > > > > >> >> > > > >> a
> > >>> > > > > > > > >> >> > > > >> > bootstrap case people actually need to
> > >>> read
> > >>> > all
> > >>> > > > the
> > >>> > > > > > > data.
> > >>> > > > > > > > If
> > >>> > > > > > > > >> we
> > >>> > > > > > > > >> >> > > depend
> > >>> > > > > > > > >> >> > > > >> on
> > >>> > > > > > > > >> >> > > > >> > the client timestamp, the data might
> be
> > >>> > deleted
> > >>> > > > > > > instantly
> > >>> > > > > > > > >> when
> > >>> > > > > > > > >> >> > they
> > >>> > > > > > > > >> >> > > > >> come to
> > >>> > > > > > > > >> >> > > > >> > the broker. It might be too demanding
> to
> > >>> > expect
> > >>> > > > the
> > >>> > > > > > > > broker to
> > >>> > > > > > > > >> >> > > > understand
> > >>> > > > > > > > >> >> > > > >> > what people actually want to do with
> the
> > >>> data
> > >>> > > > coming
> > >>> > > > > > in.
> > >>> > > > > > > > So
> > >>> > > > > > > > >> the
> > >>> > > > > > > > >> >> > > > >> guarantee
> > >>> > > > > > > > >> >> > > > >> > of using server side timestamp is that
> > >>> "after
> > >>> > > > > appended
> > >>> > > > > > > to
> > >>> > > > > > > > the
> > >>> > > > > > > > >> >> log,
> > >>> > > > > > > > >> >> > > all
> > >>> > > > > > > > >> >> > > > >> > messages will be available on broker
> for
> > >>> > > retention
> > >>> > > > > > > time",
> > >>> > > > > > > > >> >> which is
> > >>> > > > > > > > >> >> > > not
> > >>> > > > > > > > >> >> > > > >> > changeable by clients.
> > >>> > > > > > > > >> >> > > > >> > >
> > >>> > > > > > > > >> >> > > > >> > > -Jay
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> > On Thu, Sep 10, 2015 at 12:55 PM,
> > Jiangjie
> > >>> > Qin <
> > >>> > > > > > > > >> >> j...@linkedin.com
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > > >> wrote:
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >> >> Hi folks,
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >> This proposal was previously in
> KIP-31
> > >>> and we
> > >>> > > > > > separated
> > >>> > > > > > > > it
> > >>> > > > > > > > >> to
> > >>> > > > > > > > >> >> > > KIP-32
> > >>> > > > > > > > >> >> > > > >> per
> > >>> > > > > > > > >> >> > > > >> >> Neha and Jay's suggestion.
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >> The proposal is to add the following
> > two
> > >>> > > > timestamps
> > >>> > > > > > to
> > >>> > > > > > > > Kafka
> > >>> > > > > > > > >> >> > > message.
> > >>> > > > > > > > >> >> > > > >> >> - CreateTime
> > >>> > > > > > > > >> >> > > > >> >> - LogAppendTime
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >> The CreateTime will be set by the
> > >>> producer
> > >>> > and
> > >>> > > > will
> > >>> > > > > > > > change
> > >>> > > > > > > > >> >> after
> > >>> > > > > > > > >> >> > > > that.
> > >>> > > > > > > > >> >> > > > >> >> The LogAppendTime will be set by
> broker
> > >>> for
> > >>> > > > purpose
> > >>> > > > > > > such
> > >>> > > > > > > > as
> > >>> > > > > > > > >> >> > enforce
> > >>> > > > > > > > >> >> > > > log
> > >>> > > > > > > > >> >> > > > >> >> retention and log rolling.
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >> Thanks,
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >> Jiangjie (Becket) Qin
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >>
> > >>> > > > > > > > >> >> > > > >> >
> > >>> > > > > > > > >> >> > > > >>
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > > >
> > >>> > > > > > > > >> >> > > >
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> > > --
> > >>> > > > > > > > >> >> > > -- Guozhang
> > >>> > > > > > > > >> >> > >
> > >>> > > > > > > > >> >> >
> > >>> > > > > > > > >> >>
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >> >
> > >>> > > > > > > > >>
> > >>> > > > > > > >
> > >>> > > > > > > >
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to