Hey Jun,

Thanks much for the comments. Please see my comments inline.

On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. Looks good to me overall. Just a few minor
> comments.
>
> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition partition): It
> seems that there is no need to return metadata. We probably want to return
> sth like OffsetAndEpoch.
>

Previously I think we may want to re-use the existing class to keep our
consumer interface simpler. I have updated the KIP to add class
OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may confuse
this name with OffsetEpoch. Does this sound OK?


>
> 61. Should we store partition_epoch in
> /brokers/topics/[topic]/partitions/[partitionId] in ZK?
>

I have considered this. I think the advantage of adding the
partition->partition_epoch map in the existing
znode /brokers/topics/[topic]/partitions is that controller only needs to
read one znode per topic to gets its partition_epoch information. Otherwise
controller may need to read one extra znode per partition to get the same
information.

When we delete partition or expand partition of a topic, someone needs to
modify partition->partition_epoch map in znode
/brokers/topics/[topic]/partitions.
This may seem a bit more complicated than simply adding or deleting znode
/brokers/topics/[topic]/partitions/[partitionId]. But the complexity is
probably similar to the existing operation of modifying the
partition->replica_list mapping in znode /brokers/topics/[topic]. So not
sure it is better to store the partition_epoch in
/brokers/topics/[topic]/partitions/[partitionId].
What do you think?


>
> 62. For checking outdated metadata in the client, we probably want to add
> when max_partition_epoch will be used.
>

The max_partition_epoch is used in the Proposed Changes -> Client's
metadata refresh section to determine whether a metadata is outdated. And
this formula is referenced and re-used in other sections to determine
whether a metadata is outdated. Does this formula look OK?


>
> 63. "The leader_epoch should be the largest leader_epoch of messages whose
> offset < the commit offset. If no message has been consumed since consumer
> initialization, the leader_epoch from seek(...) or OffsetFetchResponse
> should be used. The partition_epoch should be read from the last
> FetchResponse corresponding to the given partition and commit offset. ":
> leader_epoch and partition_epoch are associated with an offset. So, if no
> message is consumed, there is no offset and therefore there is no need to
> read leader_epoch and partition_epoch. Also, the leader_epoch associated
> with the offset should just come from the messages returned in the fetch
> response.
>

I am thinking that, if user calls seek(..) and commitSync(...) without
consuming any messages, we should re-use the leader_epoch and
partition_epoch provided by the seek(...) in the OffsetCommitRequest. And
if messages have been successfully consumed, then leader_epoch will come
from the messages returned in the fetch response. The condition "messages
whose offset < the commit offset" is needed to take care of the log
compacted topic which may have offset gap due to log cleaning.

Did I miss something here? Or should I rephrase the paragraph to make it
less confusing?


> 64. Could you include the public methods in the OffsetEpoch class?
>

I mistakenly deleted the definition of OffsetEpoch class from the KIP. I
just added it back with the public methods. Could you take another look?


>
> Jun
>
>
> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks much. I agree that we can not rely on committed offsets to be
> always
> > deleted when we delete topic. So it is necessary to use a per-partition
> > epoch that does not change unless this partition is deleted. I also agree
> > that it is very nice to be able to uniquely identify a message with
> > (offset, leader_epoch, partition_epoch) in face of potential topic
> deletion
> > and unclean leader election.
> >
> > I agree with all your comments. And I have updated the KIP based on our
> > latest discussion. In addition, I added InvalidPartitionEpochException
> > which will be thrown by consumer.poll() if the partition_epoch associated
> > with the partition, which can be given to consumer using seek(...), is
> > different from the partition_epoch in the FetchResponse.
> >
> > Can you take another look at the latest KIP?
> >
> > Thanks!
> > Dong
> >
> >
> >
> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > My replies are the following.
> > >
> > > 60. What you described could also work. The drawback is that we will be
> > > unnecessarily changing the partition epoch when a partition hasn't
> really
> > > changed. I was imagining that the partition epoch will be stored in
> > > /brokers/topics/[topic]/partitions/[partitionId], instead of at the
> > topic
> > > level. So, not sure if ZK size limit is an issue.
> > >
> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique identifier
> > for
> > > a message. So, if a message hasn't changed, the offset and the
> associated
> > > offset_epoch ideally should remain the same (it will be kind of weird
> if
> > > two consumer apps save the offset on the same message, but the
> > offset_epoch
> > > are different). partition_epoch + leader_epoch give us that.
> > global_epoch +
> > > leader_epoch don't. If we use this approach, we can solve not only the
> > > problem that you have identified, but also other problems when there is
> > > data loss or topic re-creation more reliably. For example, in the
> future,
> > > if we include the partition_epoch and leader_epoch in the fetch
> request,
> > > the server can do a more reliable check of whether that offset is valid
> > or
> > > not. I am not sure that we can rely upon all external offsets to be
> > removed
> > > on topic deletion. For example, a topic may be deleted by an admin who
> > may
> > > not know all the applications.
> > >
> > > If we agree on the above, the second question is then how to reliably
> > > propagate the partition_epoch and the leader_epoch to the consumer when
> > > there are leader or partition changes. The leader_epoch comes from the
> > > message, which is reliable. So, I was suggesting that when we store an
> > > offset, we can just store the leader_epoch from the message set
> > containing
> > > that offset. Similarly, I was thinking that if the partition_epoch is
> in
> > > the fetch response, we can propagate partition_epoch reliably where is
> > > partition_epoch change.
> > >
> > > 63. My point is that once a leader is producing a message in the new
> > > partition_epoch, ideally, we should associate the new offsets with the
> > new
> > > partition_epoch. Otherwise, the offset_epoch won't be the correct
> unique
> > > identifier (useful for solving other problems mentioned above). I was
> > > originally thinking that the leader will include the partition_epoch in
> > the
> > > metadata cache in the fetch response. It's just that right now,
> metadata
> > > cache is updated on UpdateMetadataRequest, which typically happens
> after
> > > the LeaderAndIsrRequest. Another approach is for the leader to cache
> the
> > > partition_epoch in the Partition object and return that (instead of the
> > one
> > > in metadata cache) in the fetch response.
> > >
> > > 65. It seems to me that the global_epoch and the partition_epoch have
> > > different purposes. A partition_epoch has the benefit that it (1) can
> be
> > > used to form a unique identifier for a message and (2) can be used to
> > > solve other
> > > corner case problems in the future. I am not sure having just a
> > > global_epoch can achieve these. global_epoch is useful to determine
> which
> > > version of the metadata is newer, especially with topic deletion.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Regarding the use of the global epoch in 65), it is very similar to
> the
> > > > proposal of the metadata_epoch we discussed earlier. The main
> > difference
> > > is
> > > > that this epoch is incremented when we create/expand/delete topic and
> > > does
> > > > not change when controller re-send metadata.
> > > >
> > > > I looked at our previous discussion. It seems that we prefer
> > > > partition_epoch over the metadata_epoch because 1) we prefer not to
> > have
> > > an
> > > > ever growing metadata_epoch and 2) we can reset offset better when
> > topic
> > > is
> > > > re-created. The use of global topic_epoch avoids the drawback of an
> > ever
> > > > quickly ever growing metadata_epoch. Though the global epoch does not
> > > allow
> > > > us to recognize the invalid offset committed before the topic
> > > re-creation,
> > > > we can probably just delete the offset when we delete a topic. Thus I
> > am
> > > > not very sure whether it is still worthwhile to have a per-partition
> > > > partition_epoch if the metadata already has the global epoch.
> > > >
> > > >
> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Thanks so much. These comments very useful. Please see below my
> > > comments.
> > > > >
> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > >> Hi, Dong,
> > > > >>
> > > > >> Thanks for the updated KIP. A few more comments.
> > > > >>
> > > > >> 60. Perhaps having a partition epoch is more flexible since in the
> > > > future,
> > > > >> we may support deleting a partition as well.
> > > > >>
> > > > >
> > > > > Yeah I have considered this. I think we can probably still support
> > > > > deleting a partition by using the topic_epoch -- when partition of
> a
> > > > topic
> > > > > is deleted or created, epoch of all partitions of this topic will
> be
> > > > > incremented by 1. Therefore, if that partition is re-created later,
> > the
> > > > > epoch of that partition will still be larger than its epoch before
> > the
> > > > > deletion, which still allows the client to order the metadata for
> the
> > > > > purpose of this KIP. Does this sound reasonable?
> > > > >
> > > > > The advantage of using topic_epoch instead of partition_epoch is
> that
> > > the
> > > > > size of the /brokers/topics/[topic] znode and request/response size
> > can
> > > > be
> > > > > smaller. We have a limit on the maximum size of znode (typically
> > 1MB).
> > > > Use
> > > > > partition epoch can effectively reduce the number of partitions
> that
> > > can
> > > > be
> > > > > described by the /brokers/topics/[topic] znode.
> > > > >
> > > > > One use-case of partition_epoch for client to detect that the
> > committed
> > > > > offset, either from kafka offset topic or from the external store
> is
> > > > > invalid after partition deletion and re-creation. However, it seems
> > > that
> > > > we
> > > > > can also address this use-case with other approaches. For example,
> > when
> > > > > AdminClient deletes partitions, it can also delete the committed
> > > offsets
> > > > > for those partitions from the offset topic. If user stores offset
> > > > > externally, it might make sense for user to similarly remove
> offsets
> > of
> > > > > related partitions after these partitions are deleted. So I am not
> > sure
> > > > > that we should use partition_epoch in this KIP.
> > > > >
> > > > >
> > > > >>
> > > > >> 61. It seems that the leader epoch returned in the position() call
> > > > should
> > > > >> the the leader epoch returned in the fetch response, not the one
> in
> > > the
> > > > >> metadata cache of the client.
> > > > >
> > > > >
> > > > > I think this is a good idea. Just to double check, this change does
> > not
> > > > > affect the correctness or performance of this KIP. But it can be
> > useful
> > > > if
> > > > > we want to use the leader_epoch to better handle the offset rest in
> > > case
> > > > of
> > > > > unclean leader election, which is listed in the future work. Is
> this
> > > > > understanding correct?
> > > > >
> > > > > I have updated the KIP to specify that the leader_epoch returned by
> > > > > position() should be the largest leader_epoch of those already
> > consumed
> > > > > messages whose offset < position. If no message has been consumed
> > since
> > > > > consumer initialization, the leader_epoch from seek() or
> > > > > OffsetFetchResponse should be used. The offset included in the
> > > > > OffsetCommitRequest will also be determined in the similar manner.
> > > > >
> > > > >
> > > > >>
> > > > >> 62. I am wondering if we should return the partition epoch in the
> > > fetch
> > > > >> response as well. In the current proposal, if a topic is recreated
> > and
> > > > the
> > > > >> new leader is on the same broker as the old one, there is nothing
> to
> > > > force
> > > > >> the metadata refresh in the client. So, the client may still
> > associate
> > > > the
> > > > >> offset with the old partition epoch.
> > > > >>
> > > > >
> > > > > Could you help me understand the problem if a client associates old
> > > > > partition_epoch (or the topic_epoch as of the current KIP) with the
> > > > offset?
> > > > > The main purpose of the topic_epoch is to be able to drop
> > leader_epoch
> > > > to 0
> > > > > after a partition is deleted and re-created. I guess you may be
> > > thinking
> > > > > about using the partition_epoch to detect that the committed offset
> > is
> > > > > invalid? In that case, I am wondering if the alternative approach
> > > > described
> > > > > in 60) would be reasonable.
> > > > >
> > > > >
> > > > >>
> > > > >> 63. There is some subtle coordination between the
> > LeaderAndIsrRequest
> > > > and
> > > > >> UpdateMetadataRequest. Currently, when a leader changes, the
> > > controller
> > > > >> first sends the LeaderAndIsrRequest to the assigned replicas and
> the
> > > > >> UpdateMetadataRequest to every broker. So, there could be a small
> > > window
> > > > >> when the leader already receives the new partition epoch in the
> > > > >> LeaderAndIsrRequest, but the metadata cache in the broker hasn't
> > been
> > > > >> updated with the latest partition epoch. Not sure what's the best
> > way
> > > to
> > > > >> address this issue. Perhaps we can update the metadata cache on
> the
> > > > broker
> > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The
> > challenge
> > > > is
> > > > >> that the two have slightly different data. For example, only the
> > > latter
> > > > >> has
> > > > >> all endpoints.
> > > > >>
> > > > >
> > > > > I am not sure whether this is a problem. Could you explain a bit
> more
> > > > what
> > > > > specific problem this small window can cause?
> > > > >
> > > > > Since client can fetch metadata from any broker in the cluster, and
> > > given
> > > > > that different brokers receive request (e.g. LeaderAndIsrRequest
> and
> > > > > UpdateMetadataRequest) in arbitrary order, the metadata received by
> > > > client
> > > > > can be in arbitrary order (either newer or older) compared to the
> > > > broker's
> > > > > leadership state even if a given broker receives
> LeaderAndIsrRequest
> > > and
> > > > > UpdateMetadataRequest simultaneously. So I am not sure it is useful
> > to
> > > > > update broker's cache with LeaderAndIsrRequest.
> > > > >
> > > > >
> > > > >> 64. The enforcement of leader epoch in Offset commit: We allow a
> > > > consumer
> > > > >> to set an arbitrary offset. So it's possible for offsets or leader
> > > epoch
> > > > >> to
> > > > >> go backwards. I am not sure if we could always enforce that the
> > leader
> > > > >> epoch only goes up on the broker.
> > > > >>
> > > > >
> > > > > Sure. I have removed this check from the KIP.
> > > > >
> > > > > BTW, we can probably still ensure that the leader_epoch always
> > increase
> > > > if
> > > > > the leader_epoch used with offset commit is the max(leader_epoch of
> > the
> > > > > message with offset = the committed offset - 1, the largest known
> > > > > leader_epoch from the metadata). But I don't have a good use-case
> for
> > > > this
> > > > > alternative definition. So I choose the keep the KIP simple by
> > > requiring
> > > > > leader_epoch to always increase.
> > > > >
> > > > >
> > > > >> 65. Good point on handling missing partition epoch due to topic
> > > > deletion.
> > > > >> Another potential way to address this is to additionally propagate
> > the
> > > > >> global partition epoch to brokers and the clients. This way, when
> a
> > > > >> partition epoch is missing, we can use the global partition epoch
> to
> > > > >> reason
> > > > >> about which metadata is more recent.
> > > > >>
> > > > >
> > > > > This is a great idea. The global epoch can be used to order the
> > > metadata
> > > > > and help us recognize the more recent metadata if a topic (or
> > > partition)
> > > > is
> > > > > deleted and re-created.
> > > > >
> > > > > Actually, it seems we only need to propagate the global epoch to
> > > brokers
> > > > > and clients without propagating this epoch on a per-topic or
> > > > per-partition
> > > > > basic. Doing so would simply interface changes made this KIP. Does
> > this
> > > > > approach sound reasonable?
> > > > >
> > > > >
> > > > >> 66. A client may also get an offset by time using the
> > offsetForTimes()
> > > > >> api.
> > > > >> So, we probably want to include offsetInternalMetadata in
> > > > >> OffsetAndTimestamp
> > > > >> as well.
> > > > >>
> > > > >
> > > > > You are right. This probably also requires us to change the
> > > > > ListOffsetRequest as well. I will update the KIP after we agree on
> > the
> > > > > solution for 65).
> > > > >
> > > > >
> > > > >>
> > > > >> 67. InteralMetadata can be a bit confusing with the metadata field
> > > > already
> > > > >> there. Perhaps we can just call it OffsetEpoch. It might be useful
> > to
> > > > make
> > > > >> OffsetEpoch printable at least for debugging purpose. Once you do
> > > that,
> > > > we
> > > > >> are already exposing the internal fields. So, not sure if it's
> worth
> > > > >> hiding
> > > > >> them. If we do want to hide them, perhaps we can have sth like the
> > > > >> following. The binary encoding is probably more efficient than
> JSON
> > > for
> > > > >> external storage.
> > > > >>
> > > > >> OffsetEpoch {
> > > > >>  static OffsetEpoch decode(byte[]);
> > > > >>
> > > > >>   public byte[] encode();
> > > > >>
> > > > >>   public String toString();
> > > > >> }
> > > > >>
> > > > >
> > > > > Thanks much. I like this solution. I have updated the KIP
> > accordingly.
> > > > >
> > > > >
> > > > >
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >>
> > > > >> > Hey Jason,
> > > > >> >
> > > > >> > Certainly. This sounds good. I have updated the KIP to clarity
> > that
> > > > the
> > > > >> > global epoch will be incremented by 1 each time a topic is
> > deleted.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Dong
> > > > >> >
> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <
> > ja...@confluent.io
> > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi Dong,
> > > > >> > >
> > > > >> > >
> > > > >> > > I think your approach will allow user to distinguish between
> the
> > > > >> metadata
> > > > >> > > > before and after the topic deletion. I also agree that this
> > can
> > > be
> > > > >> > > > potentially be useful to user. I am just not very sure
> whether
> > > we
> > > > >> > already
> > > > >> > > > have a good use-case to make the additional complexity
> > > worthwhile.
> > > > >> It
> > > > >> > > seems
> > > > >> > > > that this feature is kind of independent of the main problem
> > of
> > > > this
> > > > >> > KIP.
> > > > >> > > > Could we add this as a future work?
> > > > >> > >
> > > > >> > >
> > > > >> > > Do you think it's fair if we bump the topic epoch on deletion
> > and
> > > > >> leave
> > > > >> > > propagation of the epoch for deleted topics for future work? I
> > > don't
> > > > >> > think
> > > > >> > > this adds much complexity and it makes the behavior
> consistent:
> > > > every
> > > > >> > topic
> > > > >> > > mutation results in an epoch bump.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > > Jason
> > > > >> > >
> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > >> > >
> > > > >> > > > Hey Ismael,
> > > > >> > > >
> > > > >> > > > I guess we actually need user to see this field so that user
> > can
> > > > >> store
> > > > >> > > this
> > > > >> > > > value in the external store together with the offset. We
> just
> > > > prefer
> > > > >> > the
> > > > >> > > > value to be opaque to discourage most users from
> interpreting
> > > this
> > > > >> > value.
> > > > >> > > > One more advantage of using such an opaque field is to be
> able
> > > to
> > > > >> > evolve
> > > > >> > > > the information (or schema) of this value without changing
> > > > consumer
> > > > >> API
> > > > >> > > in
> > > > >> > > > the future.
> > > > >> > > >
> > > > >> > > > I also thinking it is probably OK for user to be able to
> > > interpret
> > > > >> this
> > > > >> > > > value, particularly for those advanced users.
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Dong
> > > > >> > > >
> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <
> > ism...@juma.me.uk>
> > > > >> wrote:
> > > > >> > > >
> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <
> > > > >> ja...@confluent.io>
> > > > >> > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > class OffsetAndMetadata {
> > > > >> > > > > >   long offset;
> > > > >> > > > > >   byte[] offsetMetadata;
> > > > >> > > > > >   String metadata;
> > > > >> > > > > > }
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > > Admittedly, the naming is a bit annoying, but we can
> > > probably
> > > > >> come
> > > > >> > up
> > > > >> > > > > with
> > > > >> > > > > > something better. Internally the byte array would have a
> > > > >> version.
> > > > >> > If
> > > > >> > > in
> > > > >> > > > > the
> > > > >> > > > > > future we have anything else we need to add, we can
> update
> > > the
> > > > >> > > version
> > > > >> > > > > and
> > > > >> > > > > > we wouldn't need any new APIs.
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > > We can also add fields to a class in a compatible way. So,
> > it
> > > > >> seems
> > > > >> > to
> > > > >> > > me
> > > > >> > > > > that the main advantage of the byte array is that it's
> > opaque
> > > to
> > > > >> the
> > > > >> > > > user.
> > > > >> > > > > Is that correct? If so, we could also add any opaque
> > metadata
> > > > in a
> > > > >> > > > subclass
> > > > >> > > > > so that users don't even see it (unless they cast it, but
> > then
> > > > >> > they're
> > > > >> > > on
> > > > >> > > > > their own).
> > > > >> > > > >
> > > > >> > > > > Ismael
> > > > >> > > > >
> > > > >> > > > > The corresponding seek() and position() APIs might look
> > > > something
> > > > >> > like
> > > > >> > > > > this:
> > > > >> > > > > >
> > > > >> > > > > > void seek(TopicPartition partition, long offset, byte[]
> > > > >> > > > offsetMetadata);
> > > > >> > > > > > byte[] positionMetadata(TopicPartition partition);
> > > > >> > > > > >
> > > > >> > > > > > What do you think?
> > > > >> > > > > >
> > > > >> > > > > > Thanks,
> > > > >> > > > > > Jason
> > > > >> > > > > >
> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > >> > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Hey Jun, Jason,
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks much for all the feedback. I have updated the
> KIP
> > > > >> based on
> > > > >> > > the
> > > > >> > > > > > > latest discussion. Can you help check whether it looks
> > > good?
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > > Dong
> > > > >> > > > > > >
> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Hey Jun,
> > > > >> > > > > > > >
> > > > >> > > > > > > > Hmm... thinking about this more, I am not sure that
> > the
> > > > >> > proposed
> > > > >> > > > API
> > > > >> > > > > is
> > > > >> > > > > > > > sufficient. For users that store offset externally,
> we
> > > > >> probably
> > > > >> > > > need
> > > > >> > > > > > > extra
> > > > >> > > > > > > > API to return the leader_epoch and partition_epoch
> for
> > > all
> > > > >> > > > partitions
> > > > >> > > > > > > that
> > > > >> > > > > > > > consumers are consuming. I suppose these users
> > currently
> > > > use
> > > > >> > > > > position()
> > > > >> > > > > > > to
> > > > >> > > > > > > > get the offset. Thus we probably need a new method
> > > > >> > > > > > positionWithEpoch(..)
> > > > >> > > > > > > to
> > > > >> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does
> > > this
> > > > >> sound
> > > > >> > > > > > > reasonable?
> > > > >> > > > > > > >
> > > > >> > > > > > > > Thanks,
> > > > >> > > > > > > > Dong
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <
> > > j...@confluent.io
> > > > >
> > > > >> > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > >> Hi, Dong,
> > > > >> > > > > > > >>
> > > > >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be
> > > > >> composed
> > > > >> > of
> > > > >> > > > > > > >> (partition_epoch,
> > > > >> > > > > > > >> leader_epoch).
> > > > >> > > > > > > >>
> > > > >> > > > > > > >> Thanks,
> > > > >> > > > > > > >>
> > > > >> > > > > > > >> Jun
> > > > >> > > > > > > >>
> > > > >> > > > > > > >>
> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <
> > > > >> lindon...@gmail.com
> > > > >> > >
> > > > >> > > > > wrote:
> > > > >> > > > > > > >>
> > > > >> > > > > > > >> > Hey Jun,
> > > > >> > > > > > > >> >
> > > > >> > > > > > > >> > Thanks much. I like the the new API that you
> > > proposed.
> > > > I
> > > > >> am
> > > > >> > > not
> > > > >> > > > > sure
> > > > >> > > > > > > >> what
> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I suppose that
> we
> > > can
> > > > >> use
> > > > >> > > the
> > > > >> > > > > pair
> > > > >> > > > > > > of
> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the
> > offset_epoch,
> > > > >> right?
> > > > >> > > > > > > >> >
> > > > >> > > > > > > >> > Thanks,
> > > > >> > > > > > > >> > Dong
> > > > >> > > > > > > >> >
> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <
> > > > >> j...@confluent.io>
> > > > >> > > > wrote:
> > > > >> > > > > > > >> >
> > > > >> > > > > > > >> > > Hi, Dong,
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > Got it. The api that you proposed works. The
> > > question
> > > > >> is
> > > > >> > > > whether
> > > > >> > > > > > > >> that's
> > > > >> > > > > > > >> > the
> > > > >> > > > > > > >> > > api that we want to have in the long term. My
> > > concern
> > > > >> is
> > > > >> > > that
> > > > >> > > > > > while
> > > > >> > > > > > > >> the
> > > > >> > > > > > > >> > api
> > > > >> > > > > > > >> > > change is simple, the new api seems harder to
> > > explain
> > > > >> and
> > > > >> > > use.
> > > > >> > > > > For
> > > > >> > > > > > > >> > example,
> > > > >> > > > > > > >> > > a consumer storing offsets externally now needs
> > to
> > > > call
> > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling seek().
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > An alternative approach is to make the
> following
> > > > >> > compatible
> > > > >> > > > api
> > > > >> > > > > > > >> changes
> > > > >> > > > > > > >> > in
> > > > >> > > > > > > >> > > Consumer.
> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch field in
> > > > >> > OffsetAndMetadata.
> > > > >> > > > (no
> > > > >> > > > > > need
> > > > >> > > > > > > >> to
> > > > >> > > > > > > >> > > change the CommitSync() api)
> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition partition,
> > long
> > > > >> > offset,
> > > > >> > > > > > > >> OffsetEpoch
> > > > >> > > > > > > >> > > offsetEpoch). We can potentially deprecate the
> > old
> > > > api
> > > > >> > > > > > > >> > seek(TopicPartition
> > > > >> > > > > > > >> > > partition, long offset) in the future.
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > The alternative approach has similar amount of
> > api
> > > > >> changes
> > > > >> > > as
> > > > >> > > > > > yours
> > > > >> > > > > > > >> but
> > > > >> > > > > > > >> > has
> > > > >> > > > > > > >> > > the following benefits.
> > > > >> > > > > > > >> > > 1. The api works in a similar way as how offset
> > > > >> management
> > > > >> > > > works
> > > > >> > > > > > now
> > > > >> > > > > > > >> and
> > > > >> > > > > > > >> > is
> > > > >> > > > > > > >> > > probably what we want in the long term.
> > > > >> > > > > > > >> > > 2. It can reset offsets better when there is
> data
> > > > loss
> > > > >> due
> > > > >> > > to
> > > > >> > > > > > > unclean
> > > > >> > > > > > > >> > > leader election or correlated replica failure.
> > > > >> > > > > > > >> > > 3. It can reset offsets better when topic is
> > > > recreated.
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > Thanks,
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > Jun
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
> > > > >> > > lindon...@gmail.com
> > > > >> > > > >
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> > > > Hey Jun,
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't want an
> ever
> > > > >> growing
> > > > >> > > > global
> > > > >> > > > > > > >> metadata
> > > > >> > > > > > > >> > > > version. I just think it may be more
> desirable
> > to
> > > > >> keep
> > > > >> > the
> > > > >> > > > > > > consumer
> > > > >> > > > > > > >> API
> > > > >> > > > > > > >> > > > simple.
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > In my current proposal, metadata version
> > returned
> > > > in
> > > > >> the
> > > > >> > > > fetch
> > > > >> > > > > > > >> response
> > > > >> > > > > > > >> > > > will be stored with the offset together. More
> > > > >> > > specifically,
> > > > >> > > > > the
> > > > >> > > > > > > >> > > > metadata_epoch in the new offset topic schema
> > > will
> > > > be
> > > > >> > the
> > > > >> > > > > > largest
> > > > >> > > > > > > >> > > > metadata_epoch from all the MetadataResponse
> > and
> > > > >> > > > FetchResponse
> > > > >> > > > > > > ever
> > > > >> > > > > > > >> > > > received by this consumer.
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > We probably don't have to change the consumer
> > API
> > > > for
> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition,
> > > OffsetAndMetadata>).
> > > > >> If
> > > > >> > > user
> > > > >> > > > > > calls
> > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a
> given
> > > > >> > partition,
> > > > >> > > > for
> > > > >> > > > > > > most
> > > > >> > > > > > > >> > > > use-cases, this consumer instance should have
> > > > >> consumed
> > > > >> > > > message
> > > > >> > > > > > > with
> > > > >> > > > > > > >> > > offset
> > > > >> > > > > > > >> > > > 9 from this partition, in which case the
> > consumer
> > > > can
> > > > >> > > > remember
> > > > >> > > > > > and
> > > > >> > > > > > > >> use
> > > > >> > > > > > > >> > > the
> > > > >> > > > > > > >> > > > metadata_epoch from the corresponding
> > > FetchResponse
> > > > >> when
> > > > >> > > > > > > committing
> > > > >> > > > > > > >> > > offset.
> > > > >> > > > > > > >> > > > If user calls commitSync(..) to commit offset
> > 10
> > > > for
> > > > >> a
> > > > >> > > given
> > > > >> > > > > > > >> partition
> > > > >> > > > > > > >> > > > without having consumed the message with
> > offset 9
> > > > >> using
> > > > >> > > this
> > > > >> > > > > > > >> consumer
> > > > >> > > > > > > >> > > > instance, this is probably an advanced
> > use-case.
> > > In
> > > > >> this
> > > > >> > > > case
> > > > >> > > > > > the
> > > > >> > > > > > > >> > > advanced
> > > > >> > > > > > > >> > > > user can retrieve the metadata_epoch using
> the
> > > > newly
> > > > >> > added
> > > > >> > > > > > > >> > > metadataEpoch()
> > > > >> > > > > > > >> > > > API after it fetches the message with offset
> 9
> > > > >> (probably
> > > > >> > > > from
> > > > >> > > > > > > >> another
> > > > >> > > > > > > >> > > > consumer instance) and encode this
> > metadata_epoch
> > > > in
> > > > >> the
> > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you
> think
> > > > this
> > > > >> > > > solution
> > > > >> > > > > > > would
> > > > >> > > > > > > >> > work?
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > By "not sure that I fully understand your
> > latest
> > > > >> > > > suggestion",
> > > > >> > > > > > are
> > > > >> > > > > > > >> you
> > > > >> > > > > > > >> > > > referring to solution related to unclean
> leader
> > > > >> election
> > > > >> > > > using
> > > > >> > > > > > > >> > > leader_epoch
> > > > >> > > > > > > >> > > > in my previous email?
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > Thanks,
> > > > >> > > > > > > >> > > > Dong
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
> > > > >> > j...@confluent.io
> > > > >> > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > > > > Hi, Dong,
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > > Not sure that I fully understand your
> latest
> > > > >> > suggestion.
> > > > >> > > > > > > >> Returning an
> > > > >> > > > > > > >> > > > ever
> > > > >> > > > > > > >> > > > > growing global metadata version itself is
> no
> > > > ideal,
> > > > >> > but
> > > > >> > > is
> > > > >> > > > > > fine.
> > > > >> > > > > > > >> My
> > > > >> > > > > > > >> > > > > question is whether the metadata version
> > > returned
> > > > >> in
> > > > >> > the
> > > > >> > > > > fetch
> > > > >> > > > > > > >> > response
> > > > >> > > > > > > >> > > > > needs to be stored with the offset together
> > if
> > > > >> offsets
> > > > >> > > are
> > > > >> > > > > > > stored
> > > > >> > > > > > > >> > > > > externally. If so, we also have to change
> the
> > > > >> consumer
> > > > >> > > API
> > > > >> > > > > for
> > > > >> > > > > > > >> > > > commitSync()
> > > > >> > > > > > > >> > > > > and need to worry about compatibility. If
> we
> > > > don't
> > > > >> > store
> > > > >> > > > the
> > > > >> > > > > > > >> metadata
> > > > >> > > > > > > >> > > > > version together with the offset, on a
> > consumer
> > > > >> > restart,
> > > > >> > > > > it's
> > > > >> > > > > > > not
> > > > >> > > > > > > >> > clear
> > > > >> > > > > > > >> > > > how
> > > > >> > > > > > > >> > > > > we can ensure the metadata in the consumer
> is
> > > > high
> > > > >> > > enough
> > > > >> > > > > > since
> > > > >> > > > > > > >> there
> > > > >> > > > > > > >> > > is
> > > > >> > > > > > > >> > > > no
> > > > >> > > > > > > >> > > > > metadata version to compare with.
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > > Thanks,
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > > Jun
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> > > > >> > > > > lindon...@gmail.com
> > > > >> > > > > > >
> > > > >> > > > > > > >> > wrote:
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > > > > Hey Jun,
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > Thanks much for the explanation.
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > I understand the advantage of
> > partition_epoch
> > > > >> over
> > > > >> > > > > > > >> metadata_epoch.
> > > > >> > > > > > > >> > My
> > > > >> > > > > > > >> > > > > > current concern is that the use of
> > > leader_epoch
> > > > >> and
> > > > >> > > the
> > > > >> > > > > > > >> > > partition_epoch
> > > > >> > > > > > > >> > > > > > requires us considerable change to
> > consumer's
> > > > >> public
> > > > >> > > API
> > > > >> > > > > to
> > > > >> > > > > > > take
> > > > >> > > > > > > >> > care
> > > > >> > > > > > > >> > > > of
> > > > >> > > > > > > >> > > > > > the case where user stores offset
> > externally.
> > > > For
> > > > >> > > > example,
> > > > >> > > > > > > >> > > *consumer*.
> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to take a map
> > > whose
> > > > >> > value
> > > > >> > > is
> > > > >> > > > > > > >> <offset,
> > > > >> > > > > > > >> > > > > metadata,
> > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>.
> > > > >> > *consumer*.*seek*(...)
> > > > >> > > > > would
> > > > >> > > > > > > >> also
> > > > >> > > > > > > >> > > need
> > > > >> > > > > > > >> > > > > > leader_epoch and partition_epoch as
> > > parameter.
> > > > >> > > > Technically
> > > > >> > > > > > we
> > > > >> > > > > > > >> can
> > > > >> > > > > > > >> > > > > probably
> > > > >> > > > > > > >> > > > > > still make it work in a backward
> compatible
> > > > >> manner
> > > > >> > > after
> > > > >> > > > > > > careful
> > > > >> > > > > > > >> > > design
> > > > >> > > > > > > >> > > > > and
> > > > >> > > > > > > >> > > > > > discussion. But these changes can make
> the
> > > > >> > consumer's
> > > > >> > > > > > > interface
> > > > >> > > > > > > >> > > > > > unnecessarily complex for more users who
> do
> > > not
> > > > >> > store
> > > > >> > > > > offset
> > > > >> > > > > > > >> > > > externally.
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > After thinking more about it, we can
> > address
> > > > all
> > > > >> > > > problems
> > > > >> > > > > > > >> discussed
> > > > >> > > > > > > >> > > by
> > > > >> > > > > > > >> > > > > only
> > > > >> > > > > > > >> > > > > > using the metadata_epoch without
> > introducing
> > > > >> > > > leader_epoch
> > > > >> > > > > or
> > > > >> > > > > > > the
> > > > >> > > > > > > >> > > > > > partition_epoch. The current KIP
> describes
> > > the
> > > > >> > changes
> > > > >> > > > to
> > > > >> > > > > > the
> > > > >> > > > > > > >> > > consumer
> > > > >> > > > > > > >> > > > > API
> > > > >> > > > > > > >> > > > > > and how the new API can be used if user
> > > stores
> > > > >> > offset
> > > > >> > > > > > > >> externally.
> > > > >> > > > > > > >> > In
> > > > >> > > > > > > >> > > > > order
> > > > >> > > > > > > >> > > > > > to address the scenario you described
> > > earlier,
> > > > we
> > > > >> > can
> > > > >> > > > > > include
> > > > >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and
> the
> > > > >> > > > > > > LeaderAndIsrRequest.
> > > > >> > > > > > > >> > > > Consumer
> > > > >> > > > > > > >> > > > > > remembers the largest metadata_epoch from
> > all
> > > > the
> > > > >> > > > > > > FetchResponse
> > > > >> > > > > > > >> it
> > > > >> > > > > > > >> > > has
> > > > >> > > > > > > >> > > > > > received. The metadata_epoch committed
> with
> > > the
> > > > >> > > offset,
> > > > >> > > > > > either
> > > > >> > > > > > > >> > within
> > > > >> > > > > > > >> > > > or
> > > > >> > > > > > > >> > > > > > outside Kafka, should be the largest
> > > > >> metadata_epoch
> > > > >> > > > across
> > > > >> > > > > > all
> > > > >> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever
> > > > received
> > > > >> by
> > > > >> > > this
> > > > >> > > > > > > >> consumer.
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > The drawback of using only the
> > metadata_epoch
> > > > is
> > > > >> > that
> > > > >> > > we
> > > > >> > > > > can
> > > > >> > > > > > > not
> > > > >> > > > > > > >> > > always
> > > > >> > > > > > > >> > > > > do
> > > > >> > > > > > > >> > > > > > the smart offset reset in case of unclean
> > > > leader
> > > > >> > > > election
> > > > >> > > > > > > which
> > > > >> > > > > > > >> you
> > > > >> > > > > > > >> > > > > > mentioned earlier. But in most case,
> > unclean
> > > > >> leader
> > > > >> > > > > election
> > > > >> > > > > > > >> > probably
> > > > >> > > > > > > >> > > > > > happens when consumer is not
> > > > >> rebalancing/restarting.
> > > > >> > > In
> > > > >> > > > > > these
> > > > >> > > > > > > >> > cases,
> > > > >> > > > > > > >> > > > > either
> > > > >> > > > > > > >> > > > > > consumer is not directly affected by
> > unclean
> > > > >> leader
> > > > >> > > > > election
> > > > >> > > > > > > >> since
> > > > >> > > > > > > >> > it
> > > > >> > > > > > > >> > > > is
> > > > >> > > > > > > >> > > > > > not consuming from the end of the log, or
> > > > >> consumer
> > > > >> > can
> > > > >> > > > > > derive
> > > > >> > > > > > > >> the
> > > > >> > > > > > > >> > > > > > leader_epoch from the most recent message
> > > > >> received
> > > > >> > > > before
> > > > >> > > > > it
> > > > >> > > > > > > >> sees
> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not
> sure
> > > it
> > > > is
> > > > >> > > worth
> > > > >> > > > > > adding
> > > > >> > > > > > > >> the
> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to address
> the
> > > > >> > remaining
> > > > >> > > > > corner
> > > > >> > > > > > > >> case.
> > > > >> > > > > > > >> > > What
> > > > >> > > > > > > >> > > > > do
> > > > >> > > > > > > >> > > > > > you think?
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > Thanks,
> > > > >> > > > > > > >> > > > > > Dong
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
> > > > >> > > > j...@confluent.io
> > > > >> > > > > >
> > > > >> > > > > > > >> wrote:
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > > > > Hi, Dong,
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > > Thanks for the reply.
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > > To solve the topic recreation issue, we
> > > could
> > > > >> use
> > > > >> > > > > either a
> > > > >> > > > > > > >> global
> > > > >> > > > > > > >> > > > > > metadata
> > > > >> > > > > > > >> > > > > > > version or a partition level epoch. But
> > > > either
> > > > >> one
> > > > >> > > > will
> > > > >> > > > > > be a
> > > > >> > > > > > > >> new
> > > > >> > > > > > > >> > > > > concept,
> > > > >> > > > > > > >> > > > > > > right? To me, the latter seems more
> > > natural.
> > > > It
> > > > >> > also
> > > > >> > > > > makes
> > > > >> > > > > > > it
> > > > >> > > > > > > >> > > easier
> > > > >> > > > > > > >> > > > to
> > > > >> > > > > > > >> > > > > > > detect if a consumer's offset is still
> > > valid
> > > > >> > after a
> > > > >> > > > > topic
> > > > >> > > > > > > is
> > > > >> > > > > > > >> > > > > recreated.
> > > > >> > > > > > > >> > > > > > As
> > > > >> > > > > > > >> > > > > > > you pointed out, we don't need to store
> > the
> > > > >> > > partition
> > > > >> > > > > > epoch
> > > > >> > > > > > > in
> > > > >> > > > > > > >> > the
> > > > >> > > > > > > >> > > > > > message.
> > > > >> > > > > > > >> > > > > > > The following is what I am thinking.
> > When a
> > > > >> > > partition
> > > > >> > > > is
> > > > >> > > > > > > >> created,
> > > > >> > > > > > > >> > > we
> > > > >> > > > > > > >> > > > > can
> > > > >> > > > > > > >> > > > > > > assign a partition epoch from an
> > > > >> ever-increasing
> > > > >> > > > global
> > > > >> > > > > > > >> counter
> > > > >> > > > > > > >> > and
> > > > >> > > > > > > >> > > > > store
> > > > >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/
> > > > >> > > > partitions/[partitionId]
> > > > >> > > > > in
> > > > >> > > > > > > ZK.
> > > > >> > > > > > > >> > The
> > > > >> > > > > > > >> > > > > > > partition
> > > > >> > > > > > > >> > > > > > > epoch is propagated to every broker.
> The
> > > > >> consumer
> > > > >> > > will
> > > > >> > > > > be
> > > > >> > > > > > > >> > tracking
> > > > >> > > > > > > >> > > a
> > > > >> > > > > > > >> > > > > > tuple
> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, partition
> > epoch>
> > > > for
> > > > >> > > > offsets.
> > > > >> > > > > > If a
> > > > >> > > > > > > >> > topic
> > > > >> > > > > > > >> > > is
> > > > >> > > > > > > >> > > > > > > recreated, it's possible that a
> > consumer's
> > > > >> offset
> > > > >> > > and
> > > > >> > > > > > leader
> > > > >> > > > > > > >> > epoch
> > > > >> > > > > > > >> > > > > still
> > > > >> > > > > > > >> > > > > > > match that in the broker, but partition
> > > epoch
> > > > >> > won't
> > > > >> > > > be.
> > > > >> > > > > In
> > > > >> > > > > > > >> this
> > > > >> > > > > > > >> > > case,
> > > > >> > > > > > > >> > > > > we
> > > > >> > > > > > > >> > > > > > > can potentially still treat the
> > consumer's
> > > > >> offset
> > > > >> > as
> > > > >> > > > out
> > > > >> > > > > > of
> > > > >> > > > > > > >> range
> > > > >> > > > > > > >> > > and
> > > > >> > > > > > > >> > > > > > reset
> > > > >> > > > > > > >> > > > > > > the offset based on the offset reset
> > policy
> > > > in
> > > > >> the
> > > > >> > > > > > consumer.
> > > > >> > > > > > > >> This
> > > > >> > > > > > > >> > > > seems
> > > > >> > > > > > > >> > > > > > > harder to do with a global metadata
> > > version.
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > > Jun
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong
> > Lin <
> > > > >> > > > > > > >> lindon...@gmail.com>
> > > > >> > > > > > > >> > > > wrote:
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > > > > Hey Jun,
> > > > >> > > > > > > >> > > > > > > >
> > > > >> > > > > > > >> > > > > > > > This is a very good example. After
> > > thinking
> > > > >> > > through
> > > > >> > > > > this
> > > > >> > > > > > > in
> > > > >> > > > > > > >> > > > detail, I
> > > > >> > > > > > > >> > > > > > > agree
> > > > >> > > > > > > >> > > > > > > > that we need to commit offset with
> > leader
> > > > >> epoch
> > > > >> > in
> > > > >> > > > > order
> > > > >> > > > > > > to
> > > > >> > > > > > > >> > > address
> > > > >> > > > > > > >> > > > > > this
> > > > >> > > > > > > >> > > > > > > > example.
> > > > >> > > > > > > >> > > > > > > >
> > > > >> > > > > > > >> > > > > > > > I think the remaining question is how
> > to
> > > > >> address
> > > > >> > > the
> > > > >> > > > > > > >> scenario
> > > > >> > > > > > > >> > > that
> > > > >> > > > > > > >> > > > > the
> > > > >> > > > > > > >> > > > > > > > topic is deleted and re-created. One
> > > > possible
> > > > >> > > > solution
> > > > >> > > > > > is
> > > > >> > > > > > > to
> > > > >> > > > > > > >> > > commit
> > > > >> > > > > > > >> > > > > > > offset
> > > > >> > > > > > > >> > > > > > > > with both the leader epoch and the
> > > metadata
> > > > >> > > version.
> > > > >> > > > > The
> > > > >> > > > > > > >> logic
> > > > >> > > > > > > >> > > and
> > > > >> > > > > > > >> > > > > the
> > > > >> > > > > > > >> > > > > > > > implementation of this solution does
> > not
> > > > >> > require a
> > > > >> > > > new
> > > > >> > > > > > > >> concept
> > > > >> > > > > > > >> > > > (e.g.
> > > > >> > > > > > > >> > > > > > > > partition epoch) and it does not
> > require
> > > > any
> > > > >> > > change
> > > > >> > > > to
> > > > >> > > > > > the
> > > > >> > > > > > > >> > > message
> > > > >> > > > > > > >> > > > > > format
> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also allows us to
> > > order
> > > > >> the
> > > > >> > > > > metadata
> > > > >> > > > > > > in
> > > > >> > > > > > > >> a
> > > > >> > > > > > > >> > > > > > > > straightforward manner which may be
> > > useful
> > > > in
> > > > >> > the
> > > > >> > > > > > future.
> > > > >> > > > > > > >> So it
> > > > >> > > > > > > >> > > may
> > > > >> > > > > > > >> > > > > be
> > > > >> > > > > > > >> > > > > > a
> > > > >> > > > > > > >> > > > > > > > better solution than generating a
> > random
> > > > >> > partition
> > > > >> > > > > epoch
> > > > >> > > > > > > >> every
> > > > >> > > > > > > >> > > time
> > > > >> > > > > > > >> > > > > we
> > > > >> > > > > > > >> > > > > > > > create a partition. Does this sound
> > > > >> reasonable?
> > > > >> > > > > > > >> > > > > > > >
> > > > >> > > > > > > >> > > > > > > > Previously one concern with using the
> > > > >> metadata
> > > > >> > > > version
> > > > >> > > > > > is
> > > > >> > > > > > > >> that
> > > > >> > > > > > > >> > > > > consumer
> > > > >> > > > > > > >> > > > > > > > will be forced to refresh metadata
> even
> > > if
> > > > >> > > metadata
> > > > >> > > > > > > version
> > > > >> > > > > > > >> is
> > > > >> > > > > > > >> > > > > > increased
> > > > >> > > > > > > >> > > > > > > > due to topics that the consumer is
> not
> > > > >> > interested
> > > > >> > > > in.
> > > > >> > > > > > Now
> > > > >> > > > > > > I
> > > > >> > > > > > > >> > > > realized
> > > > >> > > > > > > >> > > > > > that
> > > > >> > > > > > > >> > > > > > > > this is probably not a problem.
> > Currently
> > > > >> client
> > > > >> > > > will
> > > > >> > > > > > > >> refresh
> > > > >> > > > > > > >> > > > > metadata
> > > > >> > > > > > > >> > > > > > > > either due to
> InvalidMetadataException
> > in
> > > > the
> > > > >> > > > response
> > > > >> > > > > > > from
> > > > >> > > > > > > >> > > broker
> > > > >> > > > > > > >> > > > or
> > > > >> > > > > > > >> > > > > > due
> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The addition of
> the
> > > > >> metadata
> > > > >> > > > > version
> > > > >> > > > > > > >> should
> > > > >> > > > > > > >> > > > > > increase
> > > > >> > > > > > > >> > > > > > > > the overhead of metadata refresh
> caused
> > > by
> > > > >> > > > > > > >> > > > InvalidMetadataException.
> > > > >> > > > > > > >> > > > > If
> > > > >> > > > > > > >> > > > > > > > client refresh metadata due to expiry
> > and
> > > > it
> > > > >> > > > receives
> > > > >> > > > > a
> > > > >> > > > > > > >> > metadata
> > > > >> > > > > > > >> > > > > whose
> > > > >> > > > > > > >> > > > > > > > version is lower than the current
> > > metadata
> > > > >> > > version,
> > > > >> > > > we
> > > > >> > > > > > can
> > > > >> > > > > > > >> > reject
> > > > >> > > > > > > >> > > > the
> > > > >> > > > > > > >> > > > > > > > metadata but still reset the metadata
> > > age,
> > > > >> which
> > > > >> > > > > > > essentially
> > > > >> > > > > > > >> > keep
> > > > >> > > > > > > >> > > > the
> > > > >> > > > > > > >> > > > > > > > existing behavior in the client.
> > > > >> > > > > > > >> > > > > > > >
> > > > >> > > > > > > >> > > > > > > > Thanks much,
> > > > >> > > > > > > >> > > > > > > > Dong
> > > > >> > > > > > > >> > > > > > > >
> > > > >> > > > > > > >> > > > > > >
> > > > >> > > > > > > >> > > > > >
> > > > >> > > > > > > >> > > > >
> > > > >> > > > > > > >> > > >
> > > > >> > > > > > > >> > >
> > > > >> > > > > > > >> >
> > > > >> > > > > > > >>
> > > > >> > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to