Hi, Dong,

The current KIP looks good to me.

Thanks,

Jun

On Tue, Jan 23, 2018 at 12:29 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Do you think the current KIP looks OK? I am wondering if we can open the
> voting thread.
>
> Thanks!
> Dong
>
> On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > I think we can probably have a static method in Util class to decode the
> > byte[]. Both KafkaConsumer implementation and the user application will
> be
> > able to decode the byte array and log its content for debug purpose. So
> it
> > seems that we can still print the information we want. It is just not
> > explicitly exposed in the consumer interface. Would this address the
> > problem here?
> >
> > Yeah we can include OffsetEpoch in AdminClient. This can be added in
> > KIP-222? Is there something you would like me to add in this KIP?
> >
> > Thanks!
> > Dong
> >
> > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote:
> >
> >> Hi, Dong,
> >>
> >> The issue with using just byte[] for OffsetEpoch is that it won't be
> >> printable, which makes debugging harder.
> >>
> >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If
> that
> >> gets adopted before this KIP, we probably want to include OffsetEpoch in
> >> the AdminClient too.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hey Jun,
> >> >
> >> > I agree. I have updated the KIP to remove the class OffetEpoch and
> >> replace
> >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it looks
> >> good?
> >> >
> >> > Thanks!
> >> > Dong
> >> >
> >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote:
> >> >
> >> > > Hi, Dong,
> >> > >
> >> > > Thanks for the updated KIP. It looks good to me now. The only thing
> is
> >> > > for OffsetEpoch.
> >> > > If we expose the individual fields in the class, we probably don't
> >> need
> >> > the
> >> > > encode/decode methods. If we want to hide the details of
> OffsetEpoch,
> >> we
> >> > > probably don't want expose the individual fields.
> >> > >
> >> > > Jun
> >> > >
> >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thinking about point 61 more, I realize that the async zookeeper
> >> read
> >> > may
> >> > > > make it less of an issue for controller to read more zookeeper
> >> nodes.
> >> > > > Writing partition_epoch in the per-partition znode makes it
> simpler
> >> to
> >> > > > handle the broker failure between zookeeper writes for a topic
> >> > creation.
> >> > > I
> >> > > > have updated the KIP to use the suggested approach.
> >> > > >
> >> > > >
> >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > > >
> >> > > > > Hey Jun,
> >> > > > >
> >> > > > > Thanks much for the comments. Please see my comments inline.
> >> > > > >
> >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io>
> >> wrote:
> >> > > > >
> >> > > > >> Hi, Dong,
> >> > > > >>
> >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just a
> few
> >> > minor
> >> > > > >> comments.
> >> > > > >>
> >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition
> >> > > partition):
> >> > > > >> It
> >> > > > >> seems that there is no need to return metadata. We probably
> want
> >> to
> >> > > > return
> >> > > > >> sth like OffsetAndEpoch.
> >> > > > >>
> >> > > > >
> >> > > > > Previously I think we may want to re-use the existing class to
> >> keep
> >> > our
> >> > > > > consumer interface simpler. I have updated the KIP to add class
> >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user
> may
> >> > > > confuse
> >> > > > > this name with OffsetEpoch. Does this sound OK?
> >> > > > >
> >> > > > >
> >> > > > >>
> >> > > > >> 61. Should we store partition_epoch in
> >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK?
> >> > > > >>
> >> > > > >
> >> > > > > I have considered this. I think the advantage of adding the
> >> > > > > partition->partition_epoch map in the existing
> >> > > > > znode /brokers/topics/[topic]/partitions is that controller
> only
> >> > needs
> >> > > > to
> >> > > > > read one znode per topic to gets its partition_epoch
> information.
> >> > > > Otherwise
> >> > > > > controller may need to read one extra znode per partition to get
> >> the
> >> > > same
> >> > > > > information.
> >> > > > >
> >> > > > > When we delete partition or expand partition of a topic, someone
> >> > needs
> >> > > to
> >> > > > > modify partition->partition_epoch map in znode
> >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more
> >> > > complicated
> >> > > > > than simply adding or deleting znode /brokers/topics/[topic]/
> >> > > > partitions/[partitionId].
> >> > > > > But the complexity is probably similar to the existing operation
> >> of
> >> > > > > modifying the partition->replica_list mapping in znode
> >> > > > > /brokers/topics/[topic]. So not sure it is better to store the
> >> > > > > partition_epoch in /brokers/topics/[topic]/partit
> >> ions/[partitionId].
> >> > > > What
> >> > > > > do you think?
> >> > > > >
> >> > > > >
> >> > > > >>
> >> > > > >> 62. For checking outdated metadata in the client, we probably
> >> want
> >> > to
> >> > > > add
> >> > > > >> when max_partition_epoch will be used.
> >> > > > >>
> >> > > > >
> >> > > > > The max_partition_epoch is used in the Proposed Changes ->
> >> Client's
> >> > > > > metadata refresh section to determine whether a metadata is
> >> outdated.
> >> > > And
> >> > > > > this formula is referenced and re-used in other sections to
> >> determine
> >> > > > > whether a metadata is outdated. Does this formula look OK?
> >> > > > >
> >> > > > >
> >> > > > >>
> >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of
> >> messages
> >> > > > whose
> >> > > > >> offset < the commit offset. If no message has been consumed
> since
> >> > > > consumer
> >> > > > >> initialization, the leader_epoch from seek(...) or
> >> > OffsetFetchResponse
> >> > > > >> should be used. The partition_epoch should be read from the
> last
> >> > > > >> FetchResponse corresponding to the given partition and commit
> >> > offset.
> >> > > ":
> >> > > > >> leader_epoch and partition_epoch are associated with an offset.
> >> So,
> >> > if
> >> > > > no
> >> > > > >> message is consumed, there is no offset and therefore there is
> no
> >> > need
> >> > > > to
> >> > > > >> read leader_epoch and partition_epoch. Also, the leader_epoch
> >> > > associated
> >> > > > >> with the offset should just come from the messages returned in
> >> the
> >> > > fetch
> >> > > > >> response.
> >> > > > >>
> >> > > > >
> >> > > > > I am thinking that, if user calls seek(..) and commitSync(...)
> >> > without
> >> > > > > consuming any messages, we should re-use the leader_epoch and
> >> > > > > partition_epoch provided by the seek(...) in the
> >> OffsetCommitRequest.
> >> > > And
> >> > > > > if messages have been successfully consumed, then leader_epoch
> >> will
> >> > > come
> >> > > > > from the messages returned in the fetch response. The condition
> >> > > "messages
> >> > > > > whose offset < the commit offset" is needed to take care of the
> >> log
> >> > > > > compacted topic which may have offset gap due to log cleaning.
> >> > > > >
> >> > > > > Did I miss something here? Or should I rephrase the paragraph to
> >> make
> >> > > it
> >> > > > > less confusing?
> >> > > > >
> >> > > > >
> >> > > > >> 64. Could you include the public methods in the OffsetEpoch
> >> class?
> >> > > > >>
> >> > > > >
> >> > > > > I mistakenly deleted the definition of OffsetEpoch class from
> the
> >> > KIP.
> >> > > I
> >> > > > > just added it back with the public methods. Could you take
> another
> >> > > look?
> >> > > > >
> >> > > > >
> >> > > > >>
> >> > > > >> Jun
> >> > > > >>
> >> > > > >>
> >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com
> >
> >> > > wrote:
> >> > > > >>
> >> > > > >> > Hey Jun,
> >> > > > >> >
> >> > > > >> > Thanks much. I agree that we can not rely on committed
> offsets
> >> to
> >> > be
> >> > > > >> always
> >> > > > >> > deleted when we delete topic. So it is necessary to use a
> >> > > > per-partition
> >> > > > >> > epoch that does not change unless this partition is deleted.
> I
> >> > also
> >> > > > >> agree
> >> > > > >> > that it is very nice to be able to uniquely identify a
> message
> >> > with
> >> > > > >> > (offset, leader_epoch, partition_epoch) in face of potential
> >> topic
> >> > > > >> deletion
> >> > > > >> > and unclean leader election.
> >> > > > >> >
> >> > > > >> > I agree with all your comments. And I have updated the KIP
> >> based
> >> > on
> >> > > > our
> >> > > > >> > latest discussion. In addition, I added
> >> > > InvalidPartitionEpochException
> >> > > > >> > which will be thrown by consumer.poll() if the
> partition_epoch
> >> > > > >> associated
> >> > > > >> > with the partition, which can be given to consumer using
> >> > seek(...),
> >> > > is
> >> > > > >> > different from the partition_epoch in the FetchResponse.
> >> > > > >> >
> >> > > > >> > Can you take another look at the latest KIP?
> >> > > > >> >
> >> > > > >> > Thanks!
> >> > > > >> > Dong
> >> > > > >> >
> >> > > > >> >
> >> > > > >> >
> >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io>
> >> > wrote:
> >> > > > >> >
> >> > > > >> > > Hi, Dong,
> >> > > > >> > >
> >> > > > >> > > My replies are the following.
> >> > > > >> > >
> >> > > > >> > > 60. What you described could also work. The drawback is
> that
> >> we
> >> > > will
> >> > > > >> be
> >> > > > >> > > unnecessarily changing the partition epoch when a partition
> >> > hasn't
> >> > > > >> really
> >> > > > >> > > changed. I was imagining that the partition epoch will be
> >> stored
> >> > > in
> >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId], instead
> >> of at
> >> > > the
> >> > > > >> > topic
> >> > > > >> > > level. So, not sure if ZK size limit is an issue.
> >> > > > >> > >
> >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique
> >> > > > identifier
> >> > > > >> > for
> >> > > > >> > > a message. So, if a message hasn't changed, the offset and
> >> the
> >> > > > >> associated
> >> > > > >> > > offset_epoch ideally should remain the same (it will be
> kind
> >> of
> >> > > > weird
> >> > > > >> if
> >> > > > >> > > two consumer apps save the offset on the same message, but
> >> the
> >> > > > >> > offset_epoch
> >> > > > >> > > are different). partition_epoch + leader_epoch give us
> that.
> >> > > > >> > global_epoch +
> >> > > > >> > > leader_epoch don't. If we use this approach, we can solve
> not
> >> > only
> >> > > > the
> >> > > > >> > > problem that you have identified, but also other problems
> >> when
> >> > > there
> >> > > > >> is
> >> > > > >> > > data loss or topic re-creation more reliably. For example,
> in
> >> > the
> >> > > > >> future,
> >> > > > >> > > if we include the partition_epoch and leader_epoch in the
> >> fetch
> >> > > > >> request,
> >> > > > >> > > the server can do a more reliable check of whether that
> >> offset
> >> > is
> >> > > > >> valid
> >> > > > >> > or
> >> > > > >> > > not. I am not sure that we can rely upon all external
> >> offsets to
> >> > > be
> >> > > > >> > removed
> >> > > > >> > > on topic deletion. For example, a topic may be deleted by
> an
> >> > admin
> >> > > > who
> >> > > > >> > may
> >> > > > >> > > not know all the applications.
> >> > > > >> > >
> >> > > > >> > > If we agree on the above, the second question is then how
> to
> >> > > > reliably
> >> > > > >> > > propagate the partition_epoch and the leader_epoch to the
> >> > consumer
> >> > > > >> when
> >> > > > >> > > there are leader or partition changes. The leader_epoch
> comes
> >> > from
> >> > > > the
> >> > > > >> > > message, which is reliable. So, I was suggesting that when
> we
> >> > > store
> >> > > > an
> >> > > > >> > > offset, we can just store the leader_epoch from the message
> >> set
> >> > > > >> > containing
> >> > > > >> > > that offset. Similarly, I was thinking that if the
> >> > partition_epoch
> >> > > > is
> >> > > > >> in
> >> > > > >> > > the fetch response, we can propagate partition_epoch
> reliably
> >> > > where
> >> > > > is
> >> > > > >> > > partition_epoch change.
> >> > > > >> > >
> >> > > > >> > > 63. My point is that once a leader is producing a message
> in
> >> the
> >> > > new
> >> > > > >> > > partition_epoch, ideally, we should associate the new
> offsets
> >> > with
> >> > > > the
> >> > > > >> > new
> >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be the
> >> > correct
> >> > > > >> unique
> >> > > > >> > > identifier (useful for solving other problems mentioned
> >> above).
> >> > I
> >> > > > was
> >> > > > >> > > originally thinking that the leader will include the
> >> > > partition_epoch
> >> > > > >> in
> >> > > > >> > the
> >> > > > >> > > metadata cache in the fetch response. It's just that right
> >> now,
> >> > > > >> metadata
> >> > > > >> > > cache is updated on UpdateMetadataRequest, which typically
> >> > happens
> >> > > > >> after
> >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the leader
> >> to
> >> > > cache
> >> > > > >> the
> >> > > > >> > > partition_epoch in the Partition object and return that
> >> (instead
> >> > > of
> >> > > > >> the
> >> > > > >> > one
> >> > > > >> > > in metadata cache) in the fetch response.
> >> > > > >> > >
> >> > > > >> > > 65. It seems to me that the global_epoch and the
> >> partition_epoch
> >> > > > have
> >> > > > >> > > different purposes. A partition_epoch has the benefit that
> it
> >> > (1)
> >> > > > can
> >> > > > >> be
> >> > > > >> > > used to form a unique identifier for a message and (2) can
> be
> >> > used
> >> > > > to
> >> > > > >> > > solve other
> >> > > > >> > > corner case problems in the future. I am not sure having
> >> just a
> >> > > > >> > > global_epoch can achieve these. global_epoch is useful to
> >> > > determine
> >> > > > >> which
> >> > > > >> > > version of the metadata is newer, especially with topic
> >> > deletion.
> >> > > > >> > >
> >> > > > >> > > Thanks,
> >> > > > >> > >
> >> > > > >> > > Jun
> >> > > > >> > >
> >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <
> >> lindon...@gmail.com>
> >> > > > >> wrote:
> >> > > > >> > >
> >> > > > >> > > > Regarding the use of the global epoch in 65), it is very
> >> > similar
> >> > > > to
> >> > > > >> the
> >> > > > >> > > > proposal of the metadata_epoch we discussed earlier. The
> >> main
> >> > > > >> > difference
> >> > > > >> > > is
> >> > > > >> > > > that this epoch is incremented when we
> create/expand/delete
> >> > > topic
> >> > > > >> and
> >> > > > >> > > does
> >> > > > >> > > > not change when controller re-send metadata.
> >> > > > >> > > >
> >> > > > >> > > > I looked at our previous discussion. It seems that we
> >> prefer
> >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we
> >> prefer
> >> > not
> >> > > > to
> >> > > > >> > have
> >> > > > >> > > an
> >> > > > >> > > > ever growing metadata_epoch and 2) we can reset offset
> >> better
> >> > > when
> >> > > > >> > topic
> >> > > > >> > > is
> >> > > > >> > > > re-created. The use of global topic_epoch avoids the
> >> drawback
> >> > of
> >> > > > an
> >> > > > >> > ever
> >> > > > >> > > > quickly ever growing metadata_epoch. Though the global
> >> epoch
> >> > > does
> >> > > > >> not
> >> > > > >> > > allow
> >> > > > >> > > > us to recognize the invalid offset committed before the
> >> topic
> >> > > > >> > > re-creation,
> >> > > > >> > > > we can probably just delete the offset when we delete a
> >> topic.
> >> > > > Thus
> >> > > > >> I
> >> > > > >> > am
> >> > > > >> > > > not very sure whether it is still worthwhile to have a
> >> > > > per-partition
> >> > > > >> > > > partition_epoch if the metadata already has the global
> >> epoch.
> >> > > > >> > > >
> >> > > > >> > > >
> >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <
> >> lindon...@gmail.com
> >> > >
> >> > > > >> wrote:
> >> > > > >> > > >
> >> > > > >> > > > > Hey Jun,
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks so much. These comments very useful. Please see
> >> below
> >> > > my
> >> > > > >> > > comments.
> >> > > > >> > > > >
> >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <
> >> j...@confluent.io>
> >> > > > wrote:
> >> > > > >> > > > >
> >> > > > >> > > > >> Hi, Dong,
> >> > > > >> > > > >>
> >> > > > >> > > > >> Thanks for the updated KIP. A few more comments.
> >> > > > >> > > > >>
> >> > > > >> > > > >> 60. Perhaps having a partition epoch is more flexible
> >> since
> >> > > in
> >> > > > >> the
> >> > > > >> > > > future,
> >> > > > >> > > > >> we may support deleting a partition as well.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > Yeah I have considered this. I think we can probably
> >> still
> >> > > > support
> >> > > > >> > > > > deleting a partition by using the topic_epoch -- when
> >> > > partition
> >> > > > >> of a
> >> > > > >> > > > topic
> >> > > > >> > > > > is deleted or created, epoch of all partitions of this
> >> topic
> >> > > > will
> >> > > > >> be
> >> > > > >> > > > > incremented by 1. Therefore, if that partition is
> >> re-created
> >> > > > >> later,
> >> > > > >> > the
> >> > > > >> > > > > epoch of that partition will still be larger than its
> >> epoch
> >> > > > before
> >> > > > >> > the
> >> > > > >> > > > > deletion, which still allows the client to order the
> >> > metadata
> >> > > > for
> >> > > > >> the
> >> > > > >> > > > > purpose of this KIP. Does this sound reasonable?
> >> > > > >> > > > >
> >> > > > >> > > > > The advantage of using topic_epoch instead of
> >> > partition_epoch
> >> > > is
> >> > > > >> that
> >> > > > >> > > the
> >> > > > >> > > > > size of the /brokers/topics/[topic] znode and
> >> > request/response
> >> > > > >> size
> >> > > > >> > can
> >> > > > >> > > > be
> >> > > > >> > > > > smaller. We have a limit on the maximum size of znode
> >> > > (typically
> >> > > > >> > 1MB).
> >> > > > >> > > > Use
> >> > > > >> > > > > partition epoch can effectively reduce the number of
> >> > > partitions
> >> > > > >> that
> >> > > > >> > > can
> >> > > > >> > > > be
> >> > > > >> > > > > described by the /brokers/topics/[topic] znode.
> >> > > > >> > > > >
> >> > > > >> > > > > One use-case of partition_epoch for client to detect
> that
> >> > the
> >> > > > >> > committed
> >> > > > >> > > > > offset, either from kafka offset topic or from the
> >> external
> >> > > > store
> >> > > > >> is
> >> > > > >> > > > > invalid after partition deletion and re-creation.
> >> However,
> >> > it
> >> > > > >> seems
> >> > > > >> > > that
> >> > > > >> > > > we
> >> > > > >> > > > > can also address this use-case with other approaches.
> For
> >> > > > example,
> >> > > > >> > when
> >> > > > >> > > > > AdminClient deletes partitions, it can also delete the
> >> > > committed
> >> > > > >> > > offsets
> >> > > > >> > > > > for those partitions from the offset topic. If user
> >> stores
> >> > > > offset
> >> > > > >> > > > > externally, it might make sense for user to similarly
> >> remove
> >> > > > >> offsets
> >> > > > >> > of
> >> > > > >> > > > > related partitions after these partitions are deleted.
> >> So I
> >> > am
> >> > > > not
> >> > > > >> > sure
> >> > > > >> > > > > that we should use partition_epoch in this KIP.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >>
> >> > > > >> > > > >> 61. It seems that the leader epoch returned in the
> >> > position()
> >> > > > >> call
> >> > > > >> > > > should
> >> > > > >> > > > >> the the leader epoch returned in the fetch response,
> not
> >> > the
> >> > > > one
> >> > > > >> in
> >> > > > >> > > the
> >> > > > >> > > > >> metadata cache of the client.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > > I think this is a good idea. Just to double check, this
> >> > change
> >> > > > >> does
> >> > > > >> > not
> >> > > > >> > > > > affect the correctness or performance of this KIP. But
> it
> >> > can
> >> > > be
> >> > > > >> > useful
> >> > > > >> > > > if
> >> > > > >> > > > > we want to use the leader_epoch to better handle the
> >> offset
> >> > > rest
> >> > > > >> in
> >> > > > >> > > case
> >> > > > >> > > > of
> >> > > > >> > > > > unclean leader election, which is listed in the future
> >> work.
> >> > > Is
> >> > > > >> this
> >> > > > >> > > > > understanding correct?
> >> > > > >> > > > >
> >> > > > >> > > > > I have updated the KIP to specify that the leader_epoch
> >> > > returned
> >> > > > >> by
> >> > > > >> > > > > position() should be the largest leader_epoch of those
> >> > already
> >> > > > >> > consumed
> >> > > > >> > > > > messages whose offset < position. If no message has
> been
> >> > > > consumed
> >> > > > >> > since
> >> > > > >> > > > > consumer initialization, the leader_epoch from seek()
> or
> >> > > > >> > > > > OffsetFetchResponse should be used. The offset included
> >> in
> >> > the
> >> > > > >> > > > > OffsetCommitRequest will also be determined in the
> >> similar
> >> > > > manner.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >>
> >> > > > >> > > > >> 62. I am wondering if we should return the partition
> >> epoch
> >> > in
> >> > > > the
> >> > > > >> > > fetch
> >> > > > >> > > > >> response as well. In the current proposal, if a topic
> is
> >> > > > >> recreated
> >> > > > >> > and
> >> > > > >> > > > the
> >> > > > >> > > > >> new leader is on the same broker as the old one, there
> >> is
> >> > > > >> nothing to
> >> > > > >> > > > force
> >> > > > >> > > > >> the metadata refresh in the client. So, the client may
> >> > still
> >> > > > >> > associate
> >> > > > >> > > > the
> >> > > > >> > > > >> offset with the old partition epoch.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > Could you help me understand the problem if a client
> >> > > associates
> >> > > > >> old
> >> > > > >> > > > > partition_epoch (or the topic_epoch as of the current
> >> KIP)
> >> > > with
> >> > > > >> the
> >> > > > >> > > > offset?
> >> > > > >> > > > > The main purpose of the topic_epoch is to be able to
> drop
> >> > > > >> > leader_epoch
> >> > > > >> > > > to 0
> >> > > > >> > > > > after a partition is deleted and re-created. I guess
> you
> >> may
> >> > > be
> >> > > > >> > > thinking
> >> > > > >> > > > > about using the partition_epoch to detect that the
> >> committed
> >> > > > >> offset
> >> > > > >> > is
> >> > > > >> > > > > invalid? In that case, I am wondering if the
> alternative
> >> > > > approach
> >> > > > >> > > > described
> >> > > > >> > > > > in 60) would be reasonable.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >>
> >> > > > >> > > > >> 63. There is some subtle coordination between the
> >> > > > >> > LeaderAndIsrRequest
> >> > > > >> > > > and
> >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader
> changes,
> >> > the
> >> > > > >> > > controller
> >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the assigned
> >> > replicas
> >> > > > and
> >> > > > >> the
> >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there could
> >> be a
> >> > > > small
> >> > > > >> > > window
> >> > > > >> > > > >> when the leader already receives the new partition
> >> epoch in
> >> > > the
> >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the
> >> broker
> >> > > > hasn't
> >> > > > >> > been
> >> > > > >> > > > >> updated with the latest partition epoch. Not sure
> what's
> >> > the
> >> > > > best
> >> > > > >> > way
> >> > > > >> > > to
> >> > > > >> > > > >> address this issue. Perhaps we can update the metadata
> >> > cache
> >> > > on
> >> > > > >> the
> >> > > > >> > > > broker
> >> > > > >> > > > >> with both LeaderAndIsrRequest and
> UpdateMetadataRequest.
> >> > The
> >> > > > >> > challenge
> >> > > > >> > > > is
> >> > > > >> > > > >> that the two have slightly different data. For
> example,
> >> > only
> >> > > > the
> >> > > > >> > > latter
> >> > > > >> > > > >> has
> >> > > > >> > > > >> all endpoints.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > I am not sure whether this is a problem. Could you
> >> explain a
> >> > > bit
> >> > > > >> more
> >> > > > >> > > > what
> >> > > > >> > > > > specific problem this small window can cause?
> >> > > > >> > > > >
> >> > > > >> > > > > Since client can fetch metadata from any broker in the
> >> > > cluster,
> >> > > > >> and
> >> > > > >> > > given
> >> > > > >> > > > > that different brokers receive request (e.g.
> >> > > LeaderAndIsrRequest
> >> > > > >> and
> >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the metadata
> >> > > received
> >> > > > >> by
> >> > > > >> > > > client
> >> > > > >> > > > > can be in arbitrary order (either newer or older)
> >> compared
> >> > to
> >> > > > the
> >> > > > >> > > > broker's
> >> > > > >> > > > > leadership state even if a given broker receives
> >> > > > >> LeaderAndIsrRequest
> >> > > > >> > > and
> >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not sure
> >> it is
> >> > > > >> useful
> >> > > > >> > to
> >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset commit:
> We
> >> > > allow
> >> > > > a
> >> > > > >> > > > consumer
> >> > > > >> > > > >> to set an arbitrary offset. So it's possible for
> >> offsets or
> >> > > > >> leader
> >> > > > >> > > epoch
> >> > > > >> > > > >> to
> >> > > > >> > > > >> go backwards. I am not sure if we could always enforce
> >> that
> >> > > the
> >> > > > >> > leader
> >> > > > >> > > > >> epoch only goes up on the broker.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > Sure. I have removed this check from the KIP.
> >> > > > >> > > > >
> >> > > > >> > > > > BTW, we can probably still ensure that the leader_epoch
> >> > always
> >> > > > >> > increase
> >> > > > >> > > > if
> >> > > > >> > > > > the leader_epoch used with offset commit is the
> >> > > max(leader_epoch
> >> > > > >> of
> >> > > > >> > the
> >> > > > >> > > > > message with offset = the committed offset - 1, the
> >> largest
> >> > > > known
> >> > > > >> > > > > leader_epoch from the metadata). But I don't have a
> good
> >> > > > use-case
> >> > > > >> for
> >> > > > >> > > > this
> >> > > > >> > > > > alternative definition. So I choose the keep the KIP
> >> simple
> >> > by
> >> > > > >> > > requiring
> >> > > > >> > > > > leader_epoch to always increase.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >> 65. Good point on handling missing partition epoch due
> >> to
> >> > > topic
> >> > > > >> > > > deletion.
> >> > > > >> > > > >> Another potential way to address this is to
> additionally
> >> > > > >> propagate
> >> > > > >> > the
> >> > > > >> > > > >> global partition epoch to brokers and the clients.
> This
> >> > way,
> >> > > > >> when a
> >> > > > >> > > > >> partition epoch is missing, we can use the global
> >> partition
> >> > > > >> epoch to
> >> > > > >> > > > >> reason
> >> > > > >> > > > >> about which metadata is more recent.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > This is a great idea. The global epoch can be used to
> >> order
> >> > > the
> >> > > > >> > > metadata
> >> > > > >> > > > > and help us recognize the more recent metadata if a
> topic
> >> > (or
> >> > > > >> > > partition)
> >> > > > >> > > > is
> >> > > > >> > > > > deleted and re-created.
> >> > > > >> > > > >
> >> > > > >> > > > > Actually, it seems we only need to propagate the global
> >> > epoch
> >> > > to
> >> > > > >> > > brokers
> >> > > > >> > > > > and clients without propagating this epoch on a
> >> per-topic or
> >> > > > >> > > > per-partition
> >> > > > >> > > > > basic. Doing so would simply interface changes made
> this
> >> > KIP.
> >> > > > Does
> >> > > > >> > this
> >> > > > >> > > > > approach sound reasonable?
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >> 66. A client may also get an offset by time using the
> >> > > > >> > offsetForTimes()
> >> > > > >> > > > >> api.
> >> > > > >> > > > >> So, we probably want to include offsetInternalMetadata
> >> in
> >> > > > >> > > > >> OffsetAndTimestamp
> >> > > > >> > > > >> as well.
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > You are right. This probably also requires us to change
> >> the
> >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP after
> we
> >> > > agree
> >> > > > on
> >> > > > >> > the
> >> > > > >> > > > > solution for 65).
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >>
> >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with the
> >> > metadata
> >> > > > >> field
> >> > > > >> > > > already
> >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It
> >> might be
> >> > > > >> useful
> >> > > > >> > to
> >> > > > >> > > > make
> >> > > > >> > > > >> OffsetEpoch printable at least for debugging purpose.
> >> Once
> >> > > you
> >> > > > do
> >> > > > >> > > that,
> >> > > > >> > > > we
> >> > > > >> > > > >> are already exposing the internal fields. So, not sure
> >> if
> >> > > it's
> >> > > > >> worth
> >> > > > >> > > > >> hiding
> >> > > > >> > > > >> them. If we do want to hide them, perhaps we can have
> >> sth
> >> > > like
> >> > > > >> the
> >> > > > >> > > > >> following. The binary encoding is probably more
> >> efficient
> >> > > than
> >> > > > >> JSON
> >> > > > >> > > for
> >> > > > >> > > > >> external storage.
> >> > > > >> > > > >>
> >> > > > >> > > > >> OffsetEpoch {
> >> > > > >> > > > >>  static OffsetEpoch decode(byte[]);
> >> > > > >> > > > >>
> >> > > > >> > > > >>   public byte[] encode();
> >> > > > >> > > > >>
> >> > > > >> > > > >>   public String toString();
> >> > > > >> > > > >> }
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks much. I like this solution. I have updated the
> KIP
> >> > > > >> > accordingly.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >>
> >> > > > >> > > > >> Jun
> >> > > > >> > > > >>
> >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <
> >> > > lindon...@gmail.com>
> >> > > > >> > wrote:
> >> > > > >> > > > >>
> >> > > > >> > > > >> > Hey Jason,
> >> > > > >> > > > >> >
> >> > > > >> > > > >> > Certainly. This sounds good. I have updated the KIP
> to
> >> > > > clarity
> >> > > > >> > that
> >> > > > >> > > > the
> >> > > > >> > > > >> > global epoch will be incremented by 1 each time a
> >> topic
> >> > is
> >> > > > >> > deleted.
> >> > > > >> > > > >> >
> >> > > > >> > > > >> > Thanks,
> >> > > > >> > > > >> > Dong
> >> > > > >> > > > >> >
> >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <
> >> > > > >> > ja...@confluent.io
> >> > > > >> > > >
> >> > > > >> > > > >> > wrote:
> >> > > > >> > > > >> >
> >> > > > >> > > > >> > > Hi Dong,
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > I think your approach will allow user to
> distinguish
> >> > > > between
> >> > > > >> the
> >> > > > >> > > > >> metadata
> >> > > > >> > > > >> > > > before and after the topic deletion. I also
> agree
> >> > that
> >> > > > this
> >> > > > >> > can
> >> > > > >> > > be
> >> > > > >> > > > >> > > > potentially be useful to user. I am just not
> very
> >> > sure
> >> > > > >> whether
> >> > > > >> > > we
> >> > > > >> > > > >> > already
> >> > > > >> > > > >> > > > have a good use-case to make the additional
> >> > complexity
> >> > > > >> > > worthwhile.
> >> > > > >> > > > >> It
> >> > > > >> > > > >> > > seems
> >> > > > >> > > > >> > > > that this feature is kind of independent of the
> >> main
> >> > > > >> problem
> >> > > > >> > of
> >> > > > >> > > > this
> >> > > > >> > > > >> > KIP.
> >> > > > >> > > > >> > > > Could we add this as a future work?
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > Do you think it's fair if we bump the topic epoch
> on
> >> > > > deletion
> >> > > > >> > and
> >> > > > >> > > > >> leave
> >> > > > >> > > > >> > > propagation of the epoch for deleted topics for
> >> future
> >> > > > work?
> >> > > > >> I
> >> > > > >> > > don't
> >> > > > >> > > > >> > think
> >> > > > >> > > > >> > > this adds much complexity and it makes the
> behavior
> >> > > > >> consistent:
> >> > > > >> > > > every
> >> > > > >> > > > >> > topic
> >> > > > >> > > > >> > > mutation results in an epoch bump.
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > Thanks,
> >> > > > >> > > > >> > > Jason
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <
> >> > > > >> lindon...@gmail.com>
> >> > > > >> > > > wrote:
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > > Hey Ismael,
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > I guess we actually need user to see this field
> so
> >> > that
> >> > > > >> user
> >> > > > >> > can
> >> > > > >> > > > >> store
> >> > > > >> > > > >> > > this
> >> > > > >> > > > >> > > > value in the external store together with the
> >> offset.
> >> > > We
> >> > > > >> just
> >> > > > >> > > > prefer
> >> > > > >> > > > >> > the
> >> > > > >> > > > >> > > > value to be opaque to discourage most users from
> >> > > > >> interpreting
> >> > > > >> > > this
> >> > > > >> > > > >> > value.
> >> > > > >> > > > >> > > > One more advantage of using such an opaque field
> >> is
> >> > to
> >> > > be
> >> > > > >> able
> >> > > > >> > > to
> >> > > > >> > > > >> > evolve
> >> > > > >> > > > >> > > > the information (or schema) of this value
> without
> >> > > > changing
> >> > > > >> > > > consumer
> >> > > > >> > > > >> API
> >> > > > >> > > > >> > > in
> >> > > > >> > > > >> > > > the future.
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > I also thinking it is probably OK for user to be
> >> able
> >> > > to
> >> > > > >> > > interpret
> >> > > > >> > > > >> this
> >> > > > >> > > > >> > > > value, particularly for those advanced users.
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > Thanks,
> >> > > > >> > > > >> > > > Dong
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <
> >> > > > >> > ism...@juma.me.uk>
> >> > > > >> > > > >> wrote:
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason
> Gustafson
> >> <
> >> > > > >> > > > >> ja...@confluent.io>
> >> > > > >> > > > >> > > > > wrote:
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > class OffsetAndMetadata {
> >> > > > >> > > > >> > > > > >   long offset;
> >> > > > >> > > > >> > > > > >   byte[] offsetMetadata;
> >> > > > >> > > > >> > > > > >   String metadata;
> >> > > > >> > > > >> > > > > > }
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying,
> but
> >> we
> >> > > can
> >> > > > >> > > probably
> >> > > > >> > > > >> come
> >> > > > >> > > > >> > up
> >> > > > >> > > > >> > > > > with
> >> > > > >> > > > >> > > > > > something better. Internally the byte array
> >> would
> >> > > > have
> >> > > > >> a
> >> > > > >> > > > >> version.
> >> > > > >> > > > >> > If
> >> > > > >> > > > >> > > in
> >> > > > >> > > > >> > > > > the
> >> > > > >> > > > >> > > > > > future we have anything else we need to add,
> >> we
> >> > can
> >> > > > >> update
> >> > > > >> > > the
> >> > > > >> > > > >> > > version
> >> > > > >> > > > >> > > > > and
> >> > > > >> > > > >> > > > > > we wouldn't need any new APIs.
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > > We can also add fields to a class in a
> >> compatible
> >> > > way.
> >> > > > >> So,
> >> > > > >> > it
> >> > > > >> > > > >> seems
> >> > > > >> > > > >> > to
> >> > > > >> > > > >> > > me
> >> > > > >> > > > >> > > > > that the main advantage of the byte array is
> >> that
> >> > > it's
> >> > > > >> > opaque
> >> > > > >> > > to
> >> > > > >> > > > >> the
> >> > > > >> > > > >> > > > user.
> >> > > > >> > > > >> > > > > Is that correct? If so, we could also add any
> >> > opaque
> >> > > > >> > metadata
> >> > > > >> > > > in a
> >> > > > >> > > > >> > > > subclass
> >> > > > >> > > > >> > > > > so that users don't even see it (unless they
> >> cast
> >> > it,
> >> > > > but
> >> > > > >> > then
> >> > > > >> > > > >> > they're
> >> > > > >> > > > >> > > on
> >> > > > >> > > > >> > > > > their own).
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > > Ismael
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > > The corresponding seek() and position() APIs
> >> might
> >> > > look
> >> > > > >> > > > something
> >> > > > >> > > > >> > like
> >> > > > >> > > > >> > > > > this:
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long
> >> offset,
> >> > > > byte[]
> >> > > > >> > > > >> > > > offsetMetadata);
> >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition
> >> > partition);
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > What do you think?
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > Thanks,
> >> > > > >> > > > >> > > > > > Jason
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <
> >> > > > >> > > lindon...@gmail.com
> >> > > > >> > > > >
> >> > > > >> > > > >> > > wrote:
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > Hey Jun, Jason,
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I have
> >> > updated
> >> > > > the
> >> > > > >> KIP
> >> > > > >> > > > >> based on
> >> > > > >> > > > >> > > the
> >> > > > >> > > > >> > > > > > > latest discussion. Can you help check
> >> whether
> >> > it
> >> > > > >> looks
> >> > > > >> > > good?
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > Thanks,
> >> > > > >> > > > >> > > > > > > Dong
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <
> >> > > > >> > > > lindon...@gmail.com
> >> > > > >> > > > >> >
> >> > > > >> > > > >> > > > wrote:
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > > Hey Jun,
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am
> not
> >> > sure
> >> > > > that
> >> > > > >> > the
> >> > > > >> > > > >> > proposed
> >> > > > >> > > > >> > > > API
> >> > > > >> > > > >> > > > > is
> >> > > > >> > > > >> > > > > > > > sufficient. For users that store offset
> >> > > > >> externally, we
> >> > > > >> > > > >> probably
> >> > > > >> > > > >> > > > need
> >> > > > >> > > > >> > > > > > > extra
> >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and
> >> > > > partition_epoch
> >> > > > >> for
> >> > > > >> > > all
> >> > > > >> > > > >> > > > partitions
> >> > > > >> > > > >> > > > > > > that
> >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose these
> >> > users
> >> > > > >> > currently
> >> > > > >> > > > use
> >> > > > >> > > > >> > > > > position()
> >> > > > >> > > > >> > > > > > > to
> >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably need a
> >> new
> >> > > > method
> >> > > > >> > > > >> > > > > > positionWithEpoch(..)
> >> > > > >> > > > >> > > > > > > to
> >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch,
> >> > leader_epoch>.
> >> > > > >> Does
> >> > > > >> > > this
> >> > > > >> > > > >> sound
> >> > > > >> > > > >> > > > > > > reasonable?
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > > Thanks,
> >> > > > >> > > > >> > > > > > > > Dong
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao
> <
> >> > > > >> > > j...@confluent.io
> >> > > > >> > > > >
> >> > > > >> > > > >> > > wrote:
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> Hi, Dong,
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking.
> >> OffsetEpoch
> >> > > will
> >> > > > >> be
> >> > > > >> > > > >> composed
> >> > > > >> > > > >> > of
> >> > > > >> > > > >> > > > > > > >> (partition_epoch,
> >> > > > >> > > > >> > > > > > > >> leader_epoch).
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >> Thanks,
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >> Jun
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong
> Lin
> >> <
> >> > > > >> > > > >> lindon...@gmail.com
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> > > > > wrote:
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >> > Hey Jun,
> >> > > > >> > > > >> > > > > > > >> >
> >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new API
> >> that
> >> > you
> >> > > > >> > > proposed.
> >> > > > >> > > > I
> >> > > > >> > > > >> am
> >> > > > >> > > > >> > > not
> >> > > > >> > > > >> > > > > sure
> >> > > > >> > > > >> > > > > > > >> what
> >> > > > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I
> >> > suppose
> >> > > > >> that we
> >> > > > >> > > can
> >> > > > >> > > > >> use
> >> > > > >> > > > >> > > the
> >> > > > >> > > > >> > > > > pair
> >> > > > >> > > > >> > > > > > > of
> >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as
> the
> >> > > > >> > offset_epoch,
> >> > > > >> > > > >> right?
> >> > > > >> > > > >> > > > > > > >> >
> >> > > > >> > > > >> > > > > > > >> > Thanks,
> >> > > > >> > > > >> > > > > > > >> > Dong
> >> > > > >> > > > >> > > > > > > >> >
> >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun
> >> Rao <
> >> > > > >> > > > >> j...@confluent.io>
> >> > > > >> > > > >> > > > wrote:
> >> > > > >> > > > >> > > > > > > >> >
> >> > > > >> > > > >> > > > > > > >> > > Hi, Dong,
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you proposed
> >> works.
> >> > > The
> >> > > > >> > > question
> >> > > > >> > > > >> is
> >> > > > >> > > > >> > > > whether
> >> > > > >> > > > >> > > > > > > >> that's
> >> > > > >> > > > >> > > > > > > >> > the
> >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the
> long
> >> > term.
> >> > > > My
> >> > > > >> > > concern
> >> > > > >> > > > >> is
> >> > > > >> > > > >> > > that
> >> > > > >> > > > >> > > > > > while
> >> > > > >> > > > >> > > > > > > >> the
> >> > > > >> > > > >> > > > > > > >> > api
> >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api seems
> >> > harder
> >> > > > to
> >> > > > >> > > explain
> >> > > > >> > > > >> and
> >> > > > >> > > > >> > > use.
> >> > > > >> > > > >> > > > > For
> >> > > > >> > > > >> > > > > > > >> > example,
> >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets
> externally
> >> > now
> >> > > > >> needs
> >> > > > >> > to
> >> > > > >> > > > call
> >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after
> calling
> >> > > > seek().
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to make
> >> the
> >> > > > >> following
> >> > > > >> > > > >> > compatible
> >> > > > >> > > > >> > > > api
> >> > > > >> > > > >> > > > > > > >> changes
> >> > > > >> > > > >> > > > > > > >> > in
> >> > > > >> > > > >> > > > > > > >> > > Consumer.
> >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch
> >> field in
> >> > > > >> > > > >> > OffsetAndMetadata.
> >> > > > >> > > > >> > > > (no
> >> > > > >> > > > >> > > > > > need
> >> > > > >> > > > >> > > > > > > >> to
> >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api)
> >> > > > >> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition
> >> > > > partition,
> >> > > > >> > long
> >> > > > >> > > > >> > offset,
> >> > > > >> > > > >> > > > > > > >> OffsetEpoch
> >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially
> >> > deprecate
> >> > > > the
> >> > > > >> > old
> >> > > > >> > > > api
> >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition
> >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the
> >> future.
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > The alternative approach has
> similar
> >> > > amount
> >> > > > of
> >> > > > >> > api
> >> > > > >> > > > >> changes
> >> > > > >> > > > >> > > as
> >> > > > >> > > > >> > > > > > yours
> >> > > > >> > > > >> > > > > > > >> but
> >> > > > >> > > > >> > > > > > > >> > has
> >> > > > >> > > > >> > > > > > > >> > > the following benefits.
> >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar way
> as
> >> how
> >> > > > >> offset
> >> > > > >> > > > >> management
> >> > > > >> > > > >> > > > works
> >> > > > >> > > > >> > > > > > now
> >> > > > >> > > > >> > > > > > > >> and
> >> > > > >> > > > >> > > > > > > >> > is
> >> > > > >> > > > >> > > > > > > >> > > probably what we want in the long
> >> term.
> >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better when
> >> > there
> >> > > is
> >> > > > >> data
> >> > > > >> > > > loss
> >> > > > >> > > > >> due
> >> > > > >> > > > >> > > to
> >> > > > >> > > > >> > > > > > > unclean
> >> > > > >> > > > >> > > > > > > >> > > leader election or correlated
> replica
> >> > > > failure.
> >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better when
> >> > topic
> >> > > is
> >> > > > >> > > > recreated.
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > Thanks,
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > Jun
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM,
> Dong
> >> > Lin <
> >> > > > >> > > > >> > > lindon...@gmail.com
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > > > > > wrote:
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> > > > Hey Jun,
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we
> don't
> >> > want
> >> > > an
> >> > > > >> ever
> >> > > > >> > > > >> growing
> >> > > > >> > > > >> > > > global
> >> > > > >> > > > >> > > > > > > >> metadata
> >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may be
> >> more
> >> > > > >> desirable
> >> > > > >> > to
> >> > > > >> > > > >> keep
> >> > > > >> > > > >> > the
> >> > > > >> > > > >> > > > > > > consumer
> >> > > > >> > > > >> > > > > > > >> API
> >> > > > >> > > > >> > > > > > > >> > > > simple.
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata
> >> > version
> >> > > > >> > returned
> >> > > > >> > > > in
> >> > > > >> > > > >> the
> >> > > > >> > > > >> > > > fetch
> >> > > > >> > > > >> > > > > > > >> response
> >> > > > >> > > > >> > > > > > > >> > > > will be stored with the offset
> >> > together.
> >> > > > >> More
> >> > > > >> > > > >> > > specifically,
> >> > > > >> > > > >> > > > > the
> >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new offset
> >> topic
> >> > > > >> schema
> >> > > > >> > > will
> >> > > > >> > > > be
> >> > > > >> > > > >> > the
> >> > > > >> > > > >> > > > > > largest
> >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the
> >> > > > MetadataResponse
> >> > > > >> > and
> >> > > > >> > > > >> > > > FetchResponse
> >> > > > >> > > > >> > > > > > > ever
> >> > > > >> > > > >> > > > > > > >> > > > received by this consumer.
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to change
> >> the
> >> > > > >> consumer
> >> > > > >> > API
> >> > > > >> > > > for
> >> > > > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition,
> >> > > > >> > > OffsetAndMetadata>).
> >> > > > >> > > > >> If
> >> > > > >> > > > >> > > user
> >> > > > >> > > > >> > > > > > calls
> >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit offset
> 10
> >> > for
> >> > > a
> >> > > > >> given
> >> > > > >> > > > >> > partition,
> >> > > > >> > > > >> > > > for
> >> > > > >> > > > >> > > > > > > most
> >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer instance
> >> > should
> >> > > > >> have
> >> > > > >> > > > >> consumed
> >> > > > >> > > > >> > > > message
> >> > > > >> > > > >> > > > > > > with
> >> > > > >> > > > >> > > > > > > >> > > offset
> >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in which
> >> case
> >> > the
> >> > > > >> > consumer
> >> > > > >> > > > can
> >> > > > >> > > > >> > > > remember
> >> > > > >> > > > >> > > > > > and
> >> > > > >> > > > >> > > > > > > >> use
> >> > > > >> > > > >> > > > > > > >> > > the
> >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from the
> >> corresponding
> >> > > > >> > > FetchResponse
> >> > > > >> > > > >> when
> >> > > > >> > > > >> > > > > > > committing
> >> > > > >> > > > >> > > > > > > >> > > offset.
> >> > > > >> > > > >> > > > > > > >> > > > If user calls commitSync(..) to
> >> commit
> >> > > > >> offset
> >> > > > >> > 10
> >> > > > >> > > > for
> >> > > > >> > > > >> a
> >> > > > >> > > > >> > > given
> >> > > > >> > > > >> > > > > > > >> partition
> >> > > > >> > > > >> > > > > > > >> > > > without having consumed the
> message
> >> > with
> >> > > > >> > offset 9
> >> > > > >> > > > >> using
> >> > > > >> > > > >> > > this
> >> > > > >> > > > >> > > > > > > >> consumer
> >> > > > >> > > > >> > > > > > > >> > > > instance, this is probably an
> >> advanced
> >> > > > >> > use-case.
> >> > > > >> > > In
> >> > > > >> > > > >> this
> >> > > > >> > > > >> > > > case
> >> > > > >> > > > >> > > > > > the
> >> > > > >> > > > >> > > > > > > >> > > advanced
> >> > > > >> > > > >> > > > > > > >> > > > user can retrieve the
> >> metadata_epoch
> >> > > using
> >> > > > >> the
> >> > > > >> > > > newly
> >> > > > >> > > > >> > added
> >> > > > >> > > > >> > > > > > > >> > > metadataEpoch()
> >> > > > >> > > > >> > > > > > > >> > > > API after it fetches the message
> >> with
> >> > > > >> offset 9
> >> > > > >> > > > >> (probably
> >> > > > >> > > > >> > > > from
> >> > > > >> > > > >> > > > > > > >> another
> >> > > > >> > > > >> > > > > > > >> > > > consumer instance) and encode
> this
> >> > > > >> > metadata_epoch
> >> > > > >> > > > in
> >> > > > >> > > > >> the
> >> > > > >> > > > >> > > > > > > >> > > > string
> OffsetAndMetadata.metadata.
> >> Do
> >> > > you
> >> > > > >> think
> >> > > > >> > > > this
> >> > > > >> > > > >> > > > solution
> >> > > > >> > > > >> > > > > > > would
> >> > > > >> > > > >> > > > > > > >> > work?
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully
> >> understand
> >> > > your
> >> > > > >> > latest
> >> > > > >> > > > >> > > > suggestion",
> >> > > > >> > > > >> > > > > > are
> >> > > > >> > > > >> > > > > > > >> you
> >> > > > >> > > > >> > > > > > > >> > > > referring to solution related to
> >> > unclean
> >> > > > >> leader
> >> > > > >> > > > >> election
> >> > > > >> > > > >> > > > using
> >> > > > >> > > > >> > > > > > > >> > > leader_epoch
> >> > > > >> > > > >> > > > > > > >> > > > in my previous email?
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > Thanks,
> >> > > > >> > > > >> > > > > > > >> > > > Dong
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM,
> Jun
> >> > Rao
> >> > > <
> >> > > > >> > > > >> > j...@confluent.io
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > > > > > wrote:
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > > > > Hi, Dong,
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully
> understand
> >> > your
> >> > > > >> latest
> >> > > > >> > > > >> > suggestion.
> >> > > > >> > > > >> > > > > > > >> Returning an
> >> > > > >> > > > >> > > > > > > >> > > > ever
> >> > > > >> > > > >> > > > > > > >> > > > > growing global metadata version
> >> > itself
> >> > > > is
> >> > > > >> no
> >> > > > >> > > > ideal,
> >> > > > >> > > > >> > but
> >> > > > >> > > > >> > > is
> >> > > > >> > > > >> > > > > > fine.
> >> > > > >> > > > >> > > > > > > >> My
> >> > > > >> > > > >> > > > > > > >> > > > > question is whether the
> metadata
> >> > > version
> >> > > > >> > > returned
> >> > > > >> > > > >> in
> >> > > > >> > > > >> > the
> >> > > > >> > > > >> > > > > fetch
> >> > > > >> > > > >> > > > > > > >> > response
> >> > > > >> > > > >> > > > > > > >> > > > > needs to be stored with the
> >> offset
> >> > > > >> together
> >> > > > >> > if
> >> > > > >> > > > >> offsets
> >> > > > >> > > > >> > > are
> >> > > > >> > > > >> > > > > > > stored
> >> > > > >> > > > >> > > > > > > >> > > > > externally. If so, we also have
> >> to
> >> > > > change
> >> > > > >> the
> >> > > > >> > > > >> consumer
> >> > > > >> > > > >> > > API
> >> > > > >> > > > >> > > > > for
> >> > > > >> > > > >> > > > > > > >> > > > commitSync()
> >> > > > >> > > > >> > > > > > > >> > > > > and need to worry about
> >> > compatibility.
> >> > > > If
> >> > > > >> we
> >> > > > >> > > > don't
> >> > > > >> > > > >> > store
> >> > > > >> > > > >> > > > the
> >> > > > >> > > > >> > > > > > > >> metadata
> >> > > > >> > > > >> > > > > > > >> > > > > version together with the
> offset,
> >> > on a
> >> > > > >> > consumer
> >> > > > >> > > > >> > restart,
> >> > > > >> > > > >> > > > > it's
> >> > > > >> > > > >> > > > > > > not
> >> > > > >> > > > >> > > > > > > >> > clear
> >> > > > >> > > > >> > > > > > > >> > > > how
> >> > > > >> > > > >> > > > > > > >> > > > > we can ensure the metadata in
> the
> >> > > > >> consumer is
> >> > > > >> > > > high
> >> > > > >> > > > >> > > enough
> >> > > > >> > > > >> > > > > > since
> >> > > > >> > > > >> > > > > > > >> there
> >> > > > >> > > > >> > > > > > > >> > > is
> >> > > > >> > > > >> > > > > > > >> > > > no
> >> > > > >> > > > >> > > > > > > >> > > > > metadata version to compare
> with.
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > Thanks,
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > Jun
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM,
> >> Dong
> >> > > > Lin <
> >> > > > >> > > > >> > > > > lindon...@gmail.com
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > wrote:
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > Hey Jun,
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the
> >> explanation.
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > I understand the advantage of
> >> > > > >> > partition_epoch
> >> > > > >> > > > >> over
> >> > > > >> > > > >> > > > > > > >> metadata_epoch.
> >> > > > >> > > > >> > > > > > > >> > My
> >> > > > >> > > > >> > > > > > > >> > > > > > current concern is that the
> >> use of
> >> > > > >> > > leader_epoch
> >> > > > >> > > > >> and
> >> > > > >> > > > >> > > the
> >> > > > >> > > > >> > > > > > > >> > > partition_epoch
> >> > > > >> > > > >> > > > > > > >> > > > > > requires us considerable
> >> change to
> >> > > > >> > consumer's
> >> > > > >> > > > >> public
> >> > > > >> > > > >> > > API
> >> > > > >> > > > >> > > > > to
> >> > > > >> > > > >> > > > > > > take
> >> > > > >> > > > >> > > > > > > >> > care
> >> > > > >> > > > >> > > > > > > >> > > > of
> >> > > > >> > > > >> > > > > > > >> > > > > > the case where user stores
> >> offset
> >> > > > >> > externally.
> >> > > > >> > > > For
> >> > > > >> > > > >> > > > example,
> >> > > > >> > > > >> > > > > > > >> > > *consumer*.
> >> > > > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have
> to
> >> > take
> >> > > a
> >> > > > >> map
> >> > > > >> > > whose
> >> > > > >> > > > >> > value
> >> > > > >> > > > >> > > is
> >> > > > >> > > > >> > > > > > > >> <offset,
> >> > > > >> > > > >> > > > > > > >> > > > > metadata,
> >> > > > >> > > > >> > > > > > > >> > > > > > leader epoch, partition
> epoch>.
> >> > > > >> > > > >> > *consumer*.*seek*(...)
> >> > > > >> > > > >> > > > > would
> >> > > > >> > > > >> > > > > > > >> also
> >> > > > >> > > > >> > > > > > > >> > > need
> >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch and
> >> partition_epoch
> >> > as
> >> > > > >> > > parameter.
> >> > > > >> > > > >> > > > Technically
> >> > > > >> > > > >> > > > > > we
> >> > > > >> > > > >> > > > > > > >> can
> >> > > > >> > > > >> > > > > > > >> > > > > probably
> >> > > > >> > > > >> > > > > > > >> > > > > > still make it work in a
> >> backward
> >> > > > >> compatible
> >> > > > >> > > > >> manner
> >> > > > >> > > > >> > > after
> >> > > > >> > > > >> > > > > > > careful
> >> > > > >> > > > >> > > > > > > >> > > design
> >> > > > >> > > > >> > > > > > > >> > > > > and
> >> > > > >> > > > >> > > > > > > >> > > > > > discussion. But these changes
> >> can
> >> > > make
> >> > > > >> the
> >> > > > >> > > > >> > consumer's
> >> > > > >> > > > >> > > > > > > interface
> >> > > > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for
> more
> >> > users
> >> > > > >> who do
> >> > > > >> > > not
> >> > > > >> > > > >> > store
> >> > > > >> > > > >> > > > > offset
> >> > > > >> > > > >> > > > > > > >> > > > externally.
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > After thinking more about it,
> >> we
> >> > can
> >> > > > >> > address
> >> > > > >> > > > all
> >> > > > >> > > > >> > > > problems
> >> > > > >> > > > >> > > > > > > >> discussed
> >> > > > >> > > > >> > > > > > > >> > > by
> >> > > > >> > > > >> > > > > > > >> > > > > only
> >> > > > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch
> >> without
> >> > > > >> > introducing
> >> > > > >> > > > >> > > > leader_epoch
> >> > > > >> > > > >> > > > > or
> >> > > > >> > > > >> > > > > > > the
> >> > > > >> > > > >> > > > > > > >> > > > > > partition_epoch. The current
> >> KIP
> >> > > > >> describes
> >> > > > >> > > the
> >> > > > >> > > > >> > changes
> >> > > > >> > > > >> > > > to
> >> > > > >> > > > >> > > > > > the
> >> > > > >> > > > >> > > > > > > >> > > consumer
> >> > > > >> > > > >> > > > > > > >> > > > > API
> >> > > > >> > > > >> > > > > > > >> > > > > > and how the new API can be
> >> used if
> >> > > > user
> >> > > > >> > > stores
> >> > > > >> > > > >> > offset
> >> > > > >> > > > >> > > > > > > >> externally.
> >> > > > >> > > > >> > > > > > > >> > In
> >> > > > >> > > > >> > > > > > > >> > > > > order
> >> > > > >> > > > >> > > > > > > >> > > > > > to address the scenario you
> >> > > described
> >> > > > >> > > earlier,
> >> > > > >> > > > we
> >> > > > >> > > > >> > can
> >> > > > >> > > > >> > > > > > include
> >> > > > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the
> >> > FetchResponse
> >> > > > and
> >> > > > >> the
> >> > > > >> > > > >> > > > > > > LeaderAndIsrRequest.
> >> > > > >> > > > >> > > > > > > >> > > > Consumer
> >> > > > >> > > > >> > > > > > > >> > > > > > remembers the largest
> >> > metadata_epoch
> >> > > > >> from
> >> > > > >> > all
> >> > > > >> > > > the
> >> > > > >> > > > >> > > > > > > FetchResponse
> >> > > > >> > > > >> > > > > > > >> it
> >> > > > >> > > > >> > > > > > > >> > > has
> >> > > > >> > > > >> > > > > > > >> > > > > > received. The metadata_epoch
> >> > > committed
> >> > > > >> with
> >> > > > >> > > the
> >> > > > >> > > > >> > > offset,
> >> > > > >> > > > >> > > > > > either
> >> > > > >> > > > >> > > > > > > >> > within
> >> > > > >> > > > >> > > > > > > >> > > > or
> >> > > > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be the
> >> > largest
> >> > > > >> > > > >> metadata_epoch
> >> > > > >> > > > >> > > > across
> >> > > > >> > > > >> > > > > > all
> >> > > > >> > > > >> > > > > > > >> > > > > > FetchResponse and
> >> MetadataResponse
> >> > > > ever
> >> > > > >> > > > received
> >> > > > >> > > > >> by
> >> > > > >> > > > >> > > this
> >> > > > >> > > > >> > > > > > > >> consumer.
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only
> the
> >> > > > >> > metadata_epoch
> >> > > > >> > > > is
> >> > > > >> > > > >> > that
> >> > > > >> > > > >> > > we
> >> > > > >> > > > >> > > > > can
> >> > > > >> > > > >> > > > > > > not
> >> > > > >> > > > >> > > > > > > >> > > always
> >> > > > >> > > > >> > > > > > > >> > > > > do
> >> > > > >> > > > >> > > > > > > >> > > > > > the smart offset reset in
> case
> >> of
> >> > > > >> unclean
> >> > > > >> > > > leader
> >> > > > >> > > > >> > > > election
> >> > > > >> > > > >> > > > > > > which
> >> > > > >> > > > >> > > > > > > >> you
> >> > > > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in
> most
> >> > case,
> >> > > > >> > unclean
> >> > > > >> > > > >> leader
> >> > > > >> > > > >> > > > > election
> >> > > > >> > > > >> > > > > > > >> > probably
> >> > > > >> > > > >> > > > > > > >> > > > > > happens when consumer is not
> >> > > > >> > > > >> rebalancing/restarting.
> >> > > > >> > > > >> > > In
> >> > > > >> > > > >> > > > > > these
> >> > > > >> > > > >> > > > > > > >> > cases,
> >> > > > >> > > > >> > > > > > > >> > > > > either
> >> > > > >> > > > >> > > > > > > >> > > > > > consumer is not directly
> >> affected
> >> > by
> >> > > > >> > unclean
> >> > > > >> > > > >> leader
> >> > > > >> > > > >> > > > > election
> >> > > > >> > > > >> > > > > > > >> since
> >> > > > >> > > > >> > > > > > > >> > it
> >> > > > >> > > > >> > > > > > > >> > > > is
> >> > > > >> > > > >> > > > > > > >> > > > > > not consuming from the end of
> >> the
> >> > > log,
> >> > > > >> or
> >> > > > >> > > > >> consumer
> >> > > > >> > > > >> > can
> >> > > > >> > > > >> > > > > > derive
> >> > > > >> > > > >> > > > > > > >> the
> >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most
> >> recent
> >> > > > >> message
> >> > > > >> > > > >> received
> >> > > > >> > > > >> > > > before
> >> > > > >> > > > >> > > > > it
> >> > > > >> > > > >> > > > > > > >> sees
> >> > > > >> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So
> >> I am
> >> > > not
> >> > > > >> sure
> >> > > > >> > > it
> >> > > > >> > > > is
> >> > > > >> > > > >> > > worth
> >> > > > >> > > > >> > > > > > adding
> >> > > > >> > > > >> > > > > > > >> the
> >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API
> to
> >> > > > address
> >> > > > >> the
> >> > > > >> > > > >> > remaining
> >> > > > >> > > > >> > > > > corner
> >> > > > >> > > > >> > > > > > > >> case.
> >> > > > >> > > > >> > > > > > > >> > > What
> >> > > > >> > > > >> > > > > > > >> > > > > do
> >> > > > >> > > > >> > > > > > > >> > > > > > you think?
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > Thanks,
> >> > > > >> > > > >> > > > > > > >> > > > > > Dong
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28
> PM,
> >> > Jun
> >> > > > Rao
> >> > > > >> <
> >> > > > >> > > > >> > > > j...@confluent.io
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> wrote:
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong,
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply.
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic
> recreation
> >> > > issue,
> >> > > > >> we
> >> > > > >> > > could
> >> > > > >> > > > >> use
> >> > > > >> > > > >> > > > > either a
> >> > > > >> > > > >> > > > > > > >> global
> >> > > > >> > > > >> > > > > > > >> > > > > > metadata
> >> > > > >> > > > >> > > > > > > >> > > > > > > version or a partition
> level
> >> > > epoch.
> >> > > > >> But
> >> > > > >> > > > either
> >> > > > >> > > > >> one
> >> > > > >> > > > >> > > > will
> >> > > > >> > > > >> > > > > > be a
> >> > > > >> > > > >> > > > > > > >> new
> >> > > > >> > > > >> > > > > > > >> > > > > concept,
> >> > > > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter
> >> seems
> >> > > more
> >> > > > >> > > natural.
> >> > > > >> > > > It
> >> > > > >> > > > >> > also
> >> > > > >> > > > >> > > > > makes
> >> > > > >> > > > >> > > > > > > it
> >> > > > >> > > > >> > > > > > > >> > > easier
> >> > > > >> > > > >> > > > > > > >> > > > to
> >> > > > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's
> >> offset is
> >> > > > still
> >> > > > >> > > valid
> >> > > > >> > > > >> > after a
> >> > > > >> > > > >> > > > > topic
> >> > > > >> > > > >> > > > > > > is
> >> > > > >> > > > >> > > > > > > >> > > > > recreated.
> >> > > > >> > > > >> > > > > > > >> > > > > > As
> >> > > > >> > > > >> > > > > > > >> > > > > > > you pointed out, we don't
> >> need
> >> > to
> >> > > > >> store
> >> > > > >> > the
> >> > > > >> > > > >> > > partition
> >> > > > >> > > > >> > > > > > epoch
> >> > > > >> > > > >> > > > > > > in
> >> > > > >> > > > >> > > > > > > >> > the
> >> > > > >> > > > >> > > > > > > >> > > > > > message.
> >> > > > >> > > > >> > > > > > > >> > > > > > > The following is what I am
> >> > > thinking.
> >> > > > >> > When a
> >> > > > >> > > > >> > > partition
> >> > > > >> > > > >> > > > is
> >> > > > >> > > > >> > > > > > > >> created,
> >> > > > >> > > > >> > > > > > > >> > > we
> >> > > > >> > > > >> > > > > > > >> > > > > can
> >> > > > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch
> >> from an
> >> > > > >> > > > >> ever-increasing
> >> > > > >> > > > >> > > > global
> >> > > > >> > > > >> > > > > > > >> counter
> >> > > > >> > > > >> > > > > > > >> > and
> >> > > > >> > > > >> > > > > > > >> > > > > store
> >> > > > >> > > > >> > > > > > > >> > > > > > > it in
> >> /brokers/topics/[topic]/
> >> > > > >> > > > >> > > > partitions/[partitionId]
> >> > > > >> > > > >> > > > > in
> >> > > > >> > > > >> > > > > > > ZK.
> >> > > > >> > > > >> > > > > > > >> > The
> >> > > > >> > > > >> > > > > > > >> > > > > > > partition
> >> > > > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to
> every
> >> > > broker.
> >> > > > >> The
> >> > > > >> > > > >> consumer
> >> > > > >> > > > >> > > will
> >> > > > >> > > > >> > > > > be
> >> > > > >> > > > >> > > > > > > >> > tracking
> >> > > > >> > > > >> > > > > > > >> > > a
> >> > > > >> > > > >> > > > > > > >> > > > > > tuple
> >> > > > >> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch,
> >> > > partition
> >> > > > >> > epoch>
> >> > > > >> > > > for
> >> > > > >> > > > >> > > > offsets.
> >> > > > >> > > > >> > > > > > If a
> >> > > > >> > > > >> > > > > > > >> > topic
> >> > > > >> > > > >> > > > > > > >> > > is
> >> > > > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible
> >> that a
> >> > > > >> > consumer's
> >> > > > >> > > > >> offset
> >> > > > >> > > > >> > > and
> >> > > > >> > > > >> > > > > > leader
> >> > > > >> > > > >> > > > > > > >> > epoch
> >> > > > >> > > > >> > > > > > > >> > > > > still
> >> > > > >> > > > >> > > > > > > >> > > > > > > match that in the broker,
> but
> >> > > > >> partition
> >> > > > >> > > epoch
> >> > > > >> > > > >> > won't
> >> > > > >> > > > >> > > > be.
> >> > > > >> > > > >> > > > > In
> >> > > > >> > > > >> > > > > > > >> this
> >> > > > >> > > > >> > > > > > > >> > > case,
> >> > > > >> > > > >> > > > > > > >> > > > > we
> >> > > > >> > > > >> > > > > > > >> > > > > > > can potentially still treat
> >> the
> >> > > > >> > consumer's
> >> > > > >> > > > >> offset
> >> > > > >> > > > >> > as
> >> > > > >> > > > >> > > > out
> >> > > > >> > > > >> > > > > > of
> >> > > > >> > > > >> > > > > > > >> range
> >> > > > >> > > > >> > > > > > > >> > > and
> >> > > > >> > > > >> > > > > > > >> > > > > > reset
> >> > > > >> > > > >> > > > > > > >> > > > > > > the offset based on the
> >> offset
> >> > > reset
> >> > > > >> > policy
> >> > > > >> > > > in
> >> > > > >> > > > >> the
> >> > > > >> > > > >> > > > > > consumer.
> >> > > > >> > > > >> > > > > > > >> This
> >> > > > >> > > > >> > > > > > > >> > > > seems
> >> > > > >> > > > >> > > > > > > >> > > > > > > harder to do with a global
> >> > > metadata
> >> > > > >> > > version.
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > Jun
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at
> 6:56
> >> AM,
> >> > > > Dong
> >> > > > >> > Lin <
> >> > > > >> > > > >> > > > > > > >> lindon...@gmail.com>
> >> > > > >> > > > >> > > > > > > >> > > > wrote:
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun,
> >> > > > >> > > > >> > > > > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good
> >> example.
> >> > > After
> >> > > > >> > > thinking
> >> > > > >> > > > >> > > through
> >> > > > >> > > > >> > > > > this
> >> > > > >> > > > >> > > > > > > in
> >> > > > >> > > > >> > > > > > > >> > > > detail, I
> >> > > > >> > > > >> > > > > > > >> > > > > > > agree
> >> > > > >> > > > >> > > > > > > >> > > > > > > > that we need to commit
> >> offset
> >> > > with
> >> > > > >> > leader
> >> > > > >> > > > >> epoch
> >> > > > >> > > > >> > in
> >> > > > >> > > > >> > > > > order
> >> > > > >> > > > >> > > > > > > to
> >> > > > >> > > > >> > > > > > > >> > > address
> >> > > > >> > > > >> > > > > > > >> > > > > > this
> >> > > > >> > > > >> > > > > > > >> > > > > > > > example.
> >> > > > >> > > > >> > > > > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining
> >> question
> >> > > is
> >> > > > >> how
> >> > > > >> > to
> >> > > > >> > > > >> address
> >> > > > >> > > > >> > > the
> >> > > > >> > > > >> > > > > > > >> scenario
> >> > > > >> > > > >> > > > > > > >> > > that
> >> > > > >> > > > >> > > > > > > >> > > > > the
> >> > > > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and
> >> > re-created.
> >> > > > One
> >> > > > >> > > > possible
> >> > > > >> > > > >> > > > solution
> >> > > > >> > > > >> > > > > > is
> >> > > > >> > > > >> > > > > > > to
> >> > > > >> > > > >> > > > > > > >> > > commit
> >> > > > >> > > > >> > > > > > > >> > > > > > > offset
> >> > > > >> > > > >> > > > > > > >> > > > > > > > with both the leader
> epoch
> >> and
> >> > > the
> >> > > > >> > > metadata
> >> > > > >> > > > >> > > version.
> >> > > > >> > > > >> > > > > The
> >> > > > >> > > > >> > > > > > > >> logic
> >> > > > >> > > > >> > > > > > > >> > > and
> >> > > > >> > > > >> > > > > > > >> > > > > the
> >> > > > >> > > > >> > > > > > > >> > > > > > > > implementation of this
> >> > solution
> >> > > > does
> >> > > > >> > not
> >> > > > >> > > > >> > require a
> >> > > > >> > > > >> > > > new
> >> > > > >> > > > >> > > > > > > >> concept
> >> > > > >> > > > >> > > > > > > >> > > > (e.g.
> >> > > > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and it
> >> does
> >> > not
> >> > > > >> > require
> >> > > > >> > > > any
> >> > > > >> > > > >> > > change
> >> > > > >> > > > >> > > > to
> >> > > > >> > > > >> > > > > > the
> >> > > > >> > > > >> > > > > > > >> > > message
> >> > > > >> > > > >> > > > > > > >> > > > > > format
> >> > > > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also
> >> > allows
> >> > > us
> >> > > > >> to
> >> > > > >> > > order
> >> > > > >> > > > >> the
> >> > > > >> > > > >> > > > > metadata
> >> > > > >> > > > >> > > > > > > in
> >> > > > >> > > > >> > > > > > > >> a
> >> > > > >> > > > >> > > > > > > >> > > > > > > > straightforward manner
> >> which
> >> > may
> >> > > > be
> >> > > > >> > > useful
> >> > > > >> > > > in
> >> > > > >> > > > >> > the
> >> > > > >> > > > >> > > > > > future.
> >> > > > >> > > > >> > > > > > > >> So it
> >> > > > >> > > > >> > > > > > > >> > > may
> >> > > > >> > > > >> > > > > > > >> > > > > be
> >> > > > >> > > > >> > > > > > > >> > > > > > a
> >> > > > >> > > > >> > > > > > > >> > > > > > > > better solution than
> >> > generating
> >> > > a
> >> > > > >> > random
> >> > > > >> > > > >> > partition
> >> > > > >> > > > >> > > > > epoch
> >> > > > >> > > > >> > > > > > > >> every
> >> > > > >> > > > >> > > > > > > >> > > time
> >> > > > >> > > > >> > > > > > > >> > > > > we
> >> > > > >> > > > >> > > > > > > >> > > > > > > > create a partition. Does
> >> this
> >> > > > sound
> >> > > > >> > > > >> reasonable?
> >> > > > >> > > > >> > > > > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern
> with
> >> > > using
> >> > > > >> the
> >> > > > >> > > > >> metadata
> >> > > > >> > > > >> > > > version
> >> > > > >> > > > >> > > > > > is
> >> > > > >> > > > >> > > > > > > >> that
> >> > > > >> > > > >> > > > > > > >> > > > > consumer
> >> > > > >> > > > >> > > > > > > >> > > > > > > > will be forced to refresh
> >> > > metadata
> >> > > > >> even
> >> > > > >> > > if
> >> > > > >> > > > >> > > metadata
> >> > > > >> > > > >> > > > > > > version
> >> > > > >> > > > >> > > > > > > >> is
> >> > > > >> > > > >> > > > > > > >> > > > > > increased
> >> > > > >> > > > >> > > > > > > >> > > > > > > > due to topics that the
> >> > consumer
> >> > > is
> >> > > > >> not
> >> > > > >> > > > >> > interested
> >> > > > >> > > > >> > > > in.
> >> > > > >> > > > >> > > > > > Now
> >> > > > >> > > > >> > > > > > > I
> >> > > > >> > > > >> > > > > > > >> > > > realized
> >> > > > >> > > > >> > > > > > > >> > > > > > that
> >> > > > >> > > > >> > > > > > > >> > > > > > > > this is probably not a
> >> > problem.
> >> > > > >> > Currently
> >> > > > >> > > > >> client
> >> > > > >> > > > >> > > > will
> >> > > > >> > > > >> > > > > > > >> refresh
> >> > > > >> > > > >> > > > > > > >> > > > > metadata
> >> > > > >> > > > >> > > > > > > >> > > > > > > > either due to
> >> > > > >> InvalidMetadataException
> >> > > > >> > in
> >> > > > >> > > > the
> >> > > > >> > > > >> > > > response
> >> > > > >> > > > >> > > > > > > from
> >> > > > >> > > > >> > > > > > > >> > > broker
> >> > > > >> > > > >> > > > > > > >> > > > or
> >> > > > >> > > > >> > > > > > > >> > > > > > due
> >> > > > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The
> >> > addition
> >> > > > of
> >> > > > >> the
> >> > > > >> > > > >> metadata
> >> > > > >> > > > >> > > > > version
> >> > > > >> > > > >> > > > > > > >> should
> >> > > > >> > > > >> > > > > > > >> > > > > > increase
> >> > > > >> > > > >> > > > > > > >> > > > > > > > the overhead of metadata
> >> > refresh
> >> > > > >> caused
> >> > > > >> > > by
> >> > > > >> > > > >> > > > > > > >> > > > InvalidMetadataException.
> >> > > > >> > > > >> > > > > > > >> > > > > If
> >> > > > >> > > > >> > > > > > > >> > > > > > > > client refresh metadata
> >> due to
> >> > > > >> expiry
> >> > > > >> > and
> >> > > > >> > > > it
> >> > > > >> > > > >> > > > receives
> >> > > > >> > > > >> > > > > a
> >> > > > >> > > > >> > > > > > > >> > metadata
> >> > > > >> > > > >> > > > > > > >> > > > > whose
> >> > > > >> > > > >> > > > > > > >> > > > > > > > version is lower than the
> >> > > current
> >> > > > >> > > metadata
> >> > > > >> > > > >> > > version,
> >> > > > >> > > > >> > > > we
> >> > > > >> > > > >> > > > > > can
> >> > > > >> > > > >> > > > > > > >> > reject
> >> > > > >> > > > >> > > > > > > >> > > > the
> >> > > > >> > > > >> > > > > > > >> > > > > > > > metadata but still reset
> >> the
> >> > > > >> metadata
> >> > > > >> > > age,
> >> > > > >> > > > >> which
> >> > > > >> > > > >> > > > > > > essentially
> >> > > > >> > > > >> > > > > > > >> > keep
> >> > > > >> > > > >> > > > > > > >> > > > the
> >> > > > >> > > > >> > > > > > > >> > > > > > > > existing behavior in the
> >> > client.
> >> > > > >> > > > >> > > > > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much,
> >> > > > >> > > > >> > > > > > > >> > > > > > > > Dong
> >> > > > >> > > > >> > > > > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > > >
> >> > > > >> > > > >> > > > > > > >> > > > >
> >> > > > >> > > > >> > > > > > > >> > > >
> >> > > > >> > > > >> > > > > > > >> > >
> >> > > > >> > > > >> > > > > > > >> >
> >> > > > >> > > > >> > > > > > > >>
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > > >
> >> > > > >> > > > >> > > > > > >
> >> > > > >> > > > >> > > > > >
> >> > > > >> > > > >> > > > >
> >> > > > >> > > > >> > > >
> >> > > > >> > > > >> > >
> >> > > > >> > > > >> >
> >> > > > >> > > > >>
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > >
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to