Regarding the use of the global epoch in 65), it is very similar to the
proposal of the metadata_epoch we discussed earlier. The main difference is
that this epoch is incremented when we create/expand/delete topic and does
not change when controller re-send metadata.

I looked at our previous discussion. It seems that we prefer
partition_epoch over the metadata_epoch because 1) we prefer not to have an
ever growing metadata_epoch and 2) we can reset offset better when topic is
re-created. The use of global topic_epoch avoids the drawback of an ever
quickly ever growing metadata_epoch. Though the global epoch does not allow
us to recognize the invalid offset committed before the topic re-creation,
we can probably just delete the offset when we delete a topic. Thus I am
not very sure whether it is still worthwhile to have a per-partition
partition_epoch if the metadata already has the global epoch.


On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks so much. These comments very useful. Please see below my comments.
>
> On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the updated KIP. A few more comments.
>>
>> 60. Perhaps having a partition epoch is more flexible since in the future,
>> we may support deleting a partition as well.
>>
>
> Yeah I have considered this. I think we can probably still support
> deleting a partition by using the topic_epoch -- when partition of a topic
> is deleted or created, epoch of all partitions of this topic will be
> incremented by 1. Therefore, if that partition is re-created later, the
> epoch of that partition will still be larger than its epoch before the
> deletion, which still allows the client to order the metadata for the
> purpose of this KIP. Does this sound reasonable?
>
> The advantage of using topic_epoch instead of partition_epoch is that the
> size of the /brokers/topics/[topic] znode and request/response size can be
> smaller. We have a limit on the maximum size of znode (typically 1MB). Use
> partition epoch can effectively reduce the number of partitions that can be
> described by the /brokers/topics/[topic] znode.
>
> One use-case of partition_epoch for client to detect that the committed
> offset, either from kafka offset topic or from the external store is
> invalid after partition deletion and re-creation. However, it seems that we
> can also address this use-case with other approaches. For example, when
> AdminClient deletes partitions, it can also delete the committed offsets
> for those partitions from the offset topic. If user stores offset
> externally, it might make sense for user to similarly remove offsets of
> related partitions after these partitions are deleted. So I am not sure
> that we should use partition_epoch in this KIP.
>
>
>>
>> 61. It seems that the leader epoch returned in the position() call should
>> the the leader epoch returned in the fetch response, not the one in the
>> metadata cache of the client.
>
>
> I think this is a good idea. Just to double check, this change does not
> affect the correctness or performance of this KIP. But it can be useful if
> we want to use the leader_epoch to better handle the offset rest in case of
> unclean leader election, which is listed in the future work. Is this
> understanding correct?
>
> I have updated the KIP to specify that the leader_epoch returned by
> position() should be the largest leader_epoch of those already consumed
> messages whose offset < position. If no message has been consumed since
> consumer initialization, the leader_epoch from seek() or
> OffsetFetchResponse should be used. The offset included in the
> OffsetCommitRequest will also be determined in the similar manner.
>
>
>>
>> 62. I am wondering if we should return the partition epoch in the fetch
>> response as well. In the current proposal, if a topic is recreated and the
>> new leader is on the same broker as the old one, there is nothing to force
>> the metadata refresh in the client. So, the client may still associate the
>> offset with the old partition epoch.
>>
>
> Could you help me understand the problem if a client associates old
> partition_epoch (or the topic_epoch as of the current KIP) with the offset?
> The main purpose of the topic_epoch is to be able to drop leader_epoch to 0
> after a partition is deleted and re-created. I guess you may be thinking
> about using the partition_epoch to detect that the committed offset is
> invalid? In that case, I am wondering if the alternative approach described
> in 60) would be reasonable.
>
>
>>
>> 63. There is some subtle coordination between the LeaderAndIsrRequest and
>> UpdateMetadataRequest. Currently, when a leader changes, the controller
>> first sends the LeaderAndIsrRequest to the assigned replicas and the
>> UpdateMetadataRequest to every broker. So, there could be a small window
>> when the leader already receives the new partition epoch in the
>> LeaderAndIsrRequest, but the metadata cache in the broker hasn't been
>> updated with the latest partition epoch. Not sure what's the best way to
>> address this issue. Perhaps we can update the metadata cache on the broker
>> with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge is
>> that the two have slightly different data. For example, only the latter
>> has
>> all endpoints.
>>
>
> I am not sure whether this is a problem. Could you explain a bit more what
> specific problem this small window can cause?
>
> Since client can fetch metadata from any broker in the cluster, and given
> that different brokers receive request (e.g. LeaderAndIsrRequest and
> UpdateMetadataRequest) in arbitrary order, the metadata received by client
> can be in arbitrary order (either newer or older) compared to the broker's
> leadership state even if a given broker receives LeaderAndIsrRequest and
> UpdateMetadataRequest simultaneously. So I am not sure it is useful to
> update broker's cache with LeaderAndIsrRequest.
>
>
>> 64. The enforcement of leader epoch in Offset commit: We allow a consumer
>> to set an arbitrary offset. So it's possible for offsets or leader epoch
>> to
>> go backwards. I am not sure if we could always enforce that the leader
>> epoch only goes up on the broker.
>>
>
> Sure. I have removed this check from the KIP.
>
> BTW, we can probably still ensure that the leader_epoch always increase if
> the leader_epoch used with offset commit is the max(leader_epoch of the
> message with offset = the committed offset - 1, the largest known
> leader_epoch from the metadata). But I don't have a good use-case for this
> alternative definition. So I choose the keep the KIP simple by requiring
> leader_epoch to always increase.
>
>
>> 65. Good point on handling missing partition epoch due to topic deletion.
>> Another potential way to address this is to additionally propagate the
>> global partition epoch to brokers and the clients. This way, when a
>> partition epoch is missing, we can use the global partition epoch to
>> reason
>> about which metadata is more recent.
>>
>
> This is a great idea. The global epoch can be used to order the metadata
> and help us recognize the more recent metadata if a topic (or partition) is
> deleted and re-created.
>
> Actually, it seems we only need to propagate the global epoch to brokers
> and clients without propagating this epoch on a per-topic or per-partition
> basic. Doing so would simply interface changes made this KIP. Does this
> approach sound reasonable?
>
>
>> 66. A client may also get an offset by time using the offsetForTimes()
>> api.
>> So, we probably want to include offsetInternalMetadata in
>> OffsetAndTimestamp
>> as well.
>>
>
> You are right. This probably also requires us to change the
> ListOffsetRequest as well. I will update the KIP after we agree on the
> solution for 65).
>
>
>>
>> 67. InteralMetadata can be a bit confusing with the metadata field already
>> there. Perhaps we can just call it OffsetEpoch. It might be useful to make
>> OffsetEpoch printable at least for debugging purpose. Once you do that, we
>> are already exposing the internal fields. So, not sure if it's worth
>> hiding
>> them. If we do want to hide them, perhaps we can have sth like the
>> following. The binary encoding is probably more efficient than JSON for
>> external storage.
>>
>> OffsetEpoch {
>>  static OffsetEpoch decode(byte[]);
>>
>>   public byte[] encode();
>>
>>   public String toString();
>> }
>>
>
> Thanks much. I like this solution. I have updated the KIP accordingly.
>
>
>
>>
>> Jun
>>
>> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jason,
>> >
>> > Certainly. This sounds good. I have updated the KIP to clarity that the
>> > global epoch will be incremented by 1 each time a topic is deleted.
>> >
>> > Thanks,
>> > Dong
>> >
>> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <ja...@confluent.io>
>> > wrote:
>> >
>> > > Hi Dong,
>> > >
>> > >
>> > > I think your approach will allow user to distinguish between the
>> metadata
>> > > > before and after the topic deletion. I also agree that this can be
>> > > > potentially be useful to user. I am just not very sure whether we
>> > already
>> > > > have a good use-case to make the additional complexity worthwhile.
>> It
>> > > seems
>> > > > that this feature is kind of independent of the main problem of this
>> > KIP.
>> > > > Could we add this as a future work?
>> > >
>> > >
>> > > Do you think it's fair if we bump the topic epoch on deletion and
>> leave
>> > > propagation of the epoch for deleted topics for future work? I don't
>> > think
>> > > this adds much complexity and it makes the behavior consistent: every
>> > topic
>> > > mutation results in an epoch bump.
>> > >
>> > > Thanks,
>> > > Jason
>> > >
>> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> wrote:
>> > >
>> > > > Hey Ismael,
>> > > >
>> > > > I guess we actually need user to see this field so that user can
>> store
>> > > this
>> > > > value in the external store together with the offset. We just prefer
>> > the
>> > > > value to be opaque to discourage most users from interpreting this
>> > value.
>> > > > One more advantage of using such an opaque field is to be able to
>> > evolve
>> > > > the information (or schema) of this value without changing consumer
>> API
>> > > in
>> > > > the future.
>> > > >
>> > > > I also thinking it is probably OK for user to be able to interpret
>> this
>> > > > value, particularly for those advanced users.
>> > > >
>> > > > Thanks,
>> > > > Dong
>> > > >
>> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk>
>> wrote:
>> > > >
>> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <
>> ja...@confluent.io>
>> > > > > wrote:
>> > > > > >
>> > > > > > class OffsetAndMetadata {
>> > > > > >   long offset;
>> > > > > >   byte[] offsetMetadata;
>> > > > > >   String metadata;
>> > > > > > }
>> > > > >
>> > > > >
>> > > > > > Admittedly, the naming is a bit annoying, but we can probably
>> come
>> > up
>> > > > > with
>> > > > > > something better. Internally the byte array would have a
>> version.
>> > If
>> > > in
>> > > > > the
>> > > > > > future we have anything else we need to add, we can update the
>> > > version
>> > > > > and
>> > > > > > we wouldn't need any new APIs.
>> > > > > >
>> > > > >
>> > > > > We can also add fields to a class in a compatible way. So, it
>> seems
>> > to
>> > > me
>> > > > > that the main advantage of the byte array is that it's opaque to
>> the
>> > > > user.
>> > > > > Is that correct? If so, we could also add any opaque metadata in a
>> > > > subclass
>> > > > > so that users don't even see it (unless they cast it, but then
>> > they're
>> > > on
>> > > > > their own).
>> > > > >
>> > > > > Ismael
>> > > > >
>> > > > > The corresponding seek() and position() APIs might look something
>> > like
>> > > > > this:
>> > > > > >
>> > > > > > void seek(TopicPartition partition, long offset, byte[]
>> > > > offsetMetadata);
>> > > > > > byte[] positionMetadata(TopicPartition partition);
>> > > > > >
>> > > > > > What do you think?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Jason
>> > > > > >
>> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > > Hey Jun, Jason,
>> > > > > > >
>> > > > > > > Thanks much for all the feedback. I have updated the KIP
>> based on
>> > > the
>> > > > > > > latest discussion. Can you help check whether it looks good?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Dong
>> > > > > > >
>> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Jun,
>> > > > > > > >
>> > > > > > > > Hmm... thinking about this more, I am not sure that the
>> > proposed
>> > > > API
>> > > > > is
>> > > > > > > > sufficient. For users that store offset externally, we
>> probably
>> > > > need
>> > > > > > > extra
>> > > > > > > > API to return the leader_epoch and partition_epoch for all
>> > > > partitions
>> > > > > > > that
>> > > > > > > > consumers are consuming. I suppose these users currently use
>> > > > > position()
>> > > > > > > to
>> > > > > > > > get the offset. Thus we probably need a new method
>> > > > > > positionWithEpoch(..)
>> > > > > > > to
>> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does this
>> sound
>> > > > > > > reasonable?
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Dong
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io>
>> > > wrote:
>> > > > > > > >
>> > > > > > > >> Hi, Dong,
>> > > > > > > >>
>> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be
>> composed
>> > of
>> > > > > > > >> (partition_epoch,
>> > > > > > > >> leader_epoch).
>> > > > > > > >>
>> > > > > > > >> Thanks,
>> > > > > > > >>
>> > > > > > > >> Jun
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > >>
>> > > > > > > >> > Hey Jun,
>> > > > > > > >> >
>> > > > > > > >> > Thanks much. I like the the new API that you proposed. I
>> am
>> > > not
>> > > > > sure
>> > > > > > > >> what
>> > > > > > > >> > you exactly mean by offset_epoch. I suppose that we can
>> use
>> > > the
>> > > > > pair
>> > > > > > > of
>> > > > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch,
>> right?
>> > > > > > > >> >
>> > > > > > > >> > Thanks,
>> > > > > > > >> > Dong
>> > > > > > > >> >
>> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <
>> j...@confluent.io>
>> > > > wrote:
>> > > > > > > >> >
>> > > > > > > >> > > Hi, Dong,
>> > > > > > > >> > >
>> > > > > > > >> > > Got it. The api that you proposed works. The question
>> is
>> > > > whether
>> > > > > > > >> that's
>> > > > > > > >> > the
>> > > > > > > >> > > api that we want to have in the long term. My concern
>> is
>> > > that
>> > > > > > while
>> > > > > > > >> the
>> > > > > > > >> > api
>> > > > > > > >> > > change is simple, the new api seems harder to explain
>> and
>> > > use.
>> > > > > For
>> > > > > > > >> > example,
>> > > > > > > >> > > a consumer storing offsets externally now needs to call
>> > > > > > > >> > > waitForMetadataUpdate() after calling seek().
>> > > > > > > >> > >
>> > > > > > > >> > > An alternative approach is to make the following
>> > compatible
>> > > > api
>> > > > > > > >> changes
>> > > > > > > >> > in
>> > > > > > > >> > > Consumer.
>> > > > > > > >> > > * Add an additional OffsetEpoch field in
>> > OffsetAndMetadata.
>> > > > (no
>> > > > > > need
>> > > > > > > >> to
>> > > > > > > >> > > change the CommitSync() api)
>> > > > > > > >> > > * Add a new api seek(TopicPartition partition, long
>> > offset,
>> > > > > > > >> OffsetEpoch
>> > > > > > > >> > > offsetEpoch). We can potentially deprecate the old api
>> > > > > > > >> > seek(TopicPartition
>> > > > > > > >> > > partition, long offset) in the future.
>> > > > > > > >> > >
>> > > > > > > >> > > The alternative approach has similar amount of api
>> changes
>> > > as
>> > > > > > yours
>> > > > > > > >> but
>> > > > > > > >> > has
>> > > > > > > >> > > the following benefits.
>> > > > > > > >> > > 1. The api works in a similar way as how offset
>> management
>> > > > works
>> > > > > > now
>> > > > > > > >> and
>> > > > > > > >> > is
>> > > > > > > >> > > probably what we want in the long term.
>> > > > > > > >> > > 2. It can reset offsets better when there is data loss
>> due
>> > > to
>> > > > > > > unclean
>> > > > > > > >> > > leader election or correlated replica failure.
>> > > > > > > >> > > 3. It can reset offsets better when topic is recreated.
>> > > > > > > >> > >
>> > > > > > > >> > > Thanks,
>> > > > > > > >> > >
>> > > > > > > >> > > Jun
>> > > > > > > >> > >
>> > > > > > > >> > >
>> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
>> > > lindon...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > >> > >
>> > > > > > > >> > > > Hey Jun,
>> > > > > > > >> > > >
>> > > > > > > >> > > > Yeah I agree that ideally we don't want an ever
>> growing
>> > > > global
>> > > > > > > >> metadata
>> > > > > > > >> > > > version. I just think it may be more desirable to
>> keep
>> > the
>> > > > > > > consumer
>> > > > > > > >> API
>> > > > > > > >> > > > simple.
>> > > > > > > >> > > >
>> > > > > > > >> > > > In my current proposal, metadata version returned in
>> the
>> > > > fetch
>> > > > > > > >> response
>> > > > > > > >> > > > will be stored with the offset together. More
>> > > specifically,
>> > > > > the
>> > > > > > > >> > > > metadata_epoch in the new offset topic schema will be
>> > the
>> > > > > > largest
>> > > > > > > >> > > > metadata_epoch from all the MetadataResponse and
>> > > > FetchResponse
>> > > > > > > ever
>> > > > > > > >> > > > received by this consumer.
>> > > > > > > >> > > >
>> > > > > > > >> > > > We probably don't have to change the consumer API for
>> > > > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>).
>> If
>> > > user
>> > > > > > calls
>> > > > > > > >> > > > commitSync(...) to commit offset 10 for a given
>> > partition,
>> > > > for
>> > > > > > > most
>> > > > > > > >> > > > use-cases, this consumer instance should have
>> consumed
>> > > > message
>> > > > > > > with
>> > > > > > > >> > > offset
>> > > > > > > >> > > > 9 from this partition, in which case the consumer can
>> > > > remember
>> > > > > > and
>> > > > > > > >> use
>> > > > > > > >> > > the
>> > > > > > > >> > > > metadata_epoch from the corresponding FetchResponse
>> when
>> > > > > > > committing
>> > > > > > > >> > > offset.
>> > > > > > > >> > > > If user calls commitSync(..) to commit offset 10 for
>> a
>> > > given
>> > > > > > > >> partition
>> > > > > > > >> > > > without having consumed the message with offset 9
>> using
>> > > this
>> > > > > > > >> consumer
>> > > > > > > >> > > > instance, this is probably an advanced use-case. In
>> this
>> > > > case
>> > > > > > the
>> > > > > > > >> > > advanced
>> > > > > > > >> > > > user can retrieve the metadata_epoch using the newly
>> > added
>> > > > > > > >> > > metadataEpoch()
>> > > > > > > >> > > > API after it fetches the message with offset 9
>> (probably
>> > > > from
>> > > > > > > >> another
>> > > > > > > >> > > > consumer instance) and encode this metadata_epoch in
>> the
>> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you think this
>> > > > solution
>> > > > > > > would
>> > > > > > > >> > work?
>> > > > > > > >> > > >
>> > > > > > > >> > > > By "not sure that I fully understand your latest
>> > > > suggestion",
>> > > > > > are
>> > > > > > > >> you
>> > > > > > > >> > > > referring to solution related to unclean leader
>> election
>> > > > using
>> > > > > > > >> > > leader_epoch
>> > > > > > > >> > > > in my previous email?
>> > > > > > > >> > > >
>> > > > > > > >> > > > Thanks,
>> > > > > > > >> > > > Dong
>> > > > > > > >> > > >
>> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
>> > j...@confluent.io
>> > > >
>> > > > > > wrote:
>> > > > > > > >> > > >
>> > > > > > > >> > > > > Hi, Dong,
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Not sure that I fully understand your latest
>> > suggestion.
>> > > > > > > >> Returning an
>> > > > > > > >> > > > ever
>> > > > > > > >> > > > > growing global metadata version itself is no ideal,
>> > but
>> > > is
>> > > > > > fine.
>> > > > > > > >> My
>> > > > > > > >> > > > > question is whether the metadata version returned
>> in
>> > the
>> > > > > fetch
>> > > > > > > >> > response
>> > > > > > > >> > > > > needs to be stored with the offset together if
>> offsets
>> > > are
>> > > > > > > stored
>> > > > > > > >> > > > > externally. If so, we also have to change the
>> consumer
>> > > API
>> > > > > for
>> > > > > > > >> > > > commitSync()
>> > > > > > > >> > > > > and need to worry about compatibility. If we don't
>> > store
>> > > > the
>> > > > > > > >> metadata
>> > > > > > > >> > > > > version together with the offset, on a consumer
>> > restart,
>> > > > > it's
>> > > > > > > not
>> > > > > > > >> > clear
>> > > > > > > >> > > > how
>> > > > > > > >> > > > > we can ensure the metadata in the consumer is high
>> > > enough
>> > > > > > since
>> > > > > > > >> there
>> > > > > > > >> > > is
>> > > > > > > >> > > > no
>> > > > > > > >> > > > > metadata version to compare with.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Thanks,
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Jun
>> > > > > > > >> > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
>> > > > > lindon...@gmail.com
>> > > > > > >
>> > > > > > > >> > wrote:
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > > Hey Jun,
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Thanks much for the explanation.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > I understand the advantage of partition_epoch
>> over
>> > > > > > > >> metadata_epoch.
>> > > > > > > >> > My
>> > > > > > > >> > > > > > current concern is that the use of leader_epoch
>> and
>> > > the
>> > > > > > > >> > > partition_epoch
>> > > > > > > >> > > > > > requires us considerable change to consumer's
>> public
>> > > API
>> > > > > to
>> > > > > > > take
>> > > > > > > >> > care
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > > the case where user stores offset externally. For
>> > > > example,
>> > > > > > > >> > > *consumer*.
>> > > > > > > >> > > > > > *commitSync*(..) would have to take a map whose
>> > value
>> > > is
>> > > > > > > >> <offset,
>> > > > > > > >> > > > > metadata,
>> > > > > > > >> > > > > > leader epoch, partition epoch>.
>> > *consumer*.*seek*(...)
>> > > > > would
>> > > > > > > >> also
>> > > > > > > >> > > need
>> > > > > > > >> > > > > > leader_epoch and partition_epoch as parameter.
>> > > > Technically
>> > > > > > we
>> > > > > > > >> can
>> > > > > > > >> > > > > probably
>> > > > > > > >> > > > > > still make it work in a backward compatible
>> manner
>> > > after
>> > > > > > > careful
>> > > > > > > >> > > design
>> > > > > > > >> > > > > and
>> > > > > > > >> > > > > > discussion. But these changes can make the
>> > consumer's
>> > > > > > > interface
>> > > > > > > >> > > > > > unnecessarily complex for more users who do not
>> > store
>> > > > > offset
>> > > > > > > >> > > > externally.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > After thinking more about it, we can address all
>> > > > problems
>> > > > > > > >> discussed
>> > > > > > > >> > > by
>> > > > > > > >> > > > > only
>> > > > > > > >> > > > > > using the metadata_epoch without introducing
>> > > > leader_epoch
>> > > > > or
>> > > > > > > the
>> > > > > > > >> > > > > > partition_epoch. The current KIP describes the
>> > changes
>> > > > to
>> > > > > > the
>> > > > > > > >> > > consumer
>> > > > > > > >> > > > > API
>> > > > > > > >> > > > > > and how the new API can be used if user stores
>> > offset
>> > > > > > > >> externally.
>> > > > > > > >> > In
>> > > > > > > >> > > > > order
>> > > > > > > >> > > > > > to address the scenario you described earlier, we
>> > can
>> > > > > > include
>> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and the
>> > > > > > > LeaderAndIsrRequest.
>> > > > > > > >> > > > Consumer
>> > > > > > > >> > > > > > remembers the largest metadata_epoch from all the
>> > > > > > > FetchResponse
>> > > > > > > >> it
>> > > > > > > >> > > has
>> > > > > > > >> > > > > > received. The metadata_epoch committed with the
>> > > offset,
>> > > > > > either
>> > > > > > > >> > within
>> > > > > > > >> > > > or
>> > > > > > > >> > > > > > outside Kafka, should be the largest
>> metadata_epoch
>> > > > across
>> > > > > > all
>> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever received
>> by
>> > > this
>> > > > > > > >> consumer.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > The drawback of using only the metadata_epoch is
>> > that
>> > > we
>> > > > > can
>> > > > > > > not
>> > > > > > > >> > > always
>> > > > > > > >> > > > > do
>> > > > > > > >> > > > > > the smart offset reset in case of unclean leader
>> > > > election
>> > > > > > > which
>> > > > > > > >> you
>> > > > > > > >> > > > > > mentioned earlier. But in most case, unclean
>> leader
>> > > > > election
>> > > > > > > >> > probably
>> > > > > > > >> > > > > > happens when consumer is not
>> rebalancing/restarting.
>> > > In
>> > > > > > these
>> > > > > > > >> > cases,
>> > > > > > > >> > > > > either
>> > > > > > > >> > > > > > consumer is not directly affected by unclean
>> leader
>> > > > > election
>> > > > > > > >> since
>> > > > > > > >> > it
>> > > > > > > >> > > > is
>> > > > > > > >> > > > > > not consuming from the end of the log, or
>> consumer
>> > can
>> > > > > > derive
>> > > > > > > >> the
>> > > > > > > >> > > > > > leader_epoch from the most recent message
>> received
>> > > > before
>> > > > > it
>> > > > > > > >> sees
>> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is
>> > > worth
>> > > > > > adding
>> > > > > > > >> the
>> > > > > > > >> > > > > > leader_epoch to consumer API to address the
>> > remaining
>> > > > > corner
>> > > > > > > >> case.
>> > > > > > > >> > > What
>> > > > > > > >> > > > > do
>> > > > > > > >> > > > > > you think?
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Thanks,
>> > > > > > > >> > > > > > Dong
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
>> > > > j...@confluent.io
>> > > > > >
>> > > > > > > >> wrote:
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > > Hi, Dong,
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Thanks for the reply.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > To solve the topic recreation issue, we could
>> use
>> > > > > either a
>> > > > > > > >> global
>> > > > > > > >> > > > > > metadata
>> > > > > > > >> > > > > > > version or a partition level epoch. But either
>> one
>> > > > will
>> > > > > > be a
>> > > > > > > >> new
>> > > > > > > >> > > > > concept,
>> > > > > > > >> > > > > > > right? To me, the latter seems more natural. It
>> > also
>> > > > > makes
>> > > > > > > it
>> > > > > > > >> > > easier
>> > > > > > > >> > > > to
>> > > > > > > >> > > > > > > detect if a consumer's offset is still valid
>> > after a
>> > > > > topic
>> > > > > > > is
>> > > > > > > >> > > > > recreated.
>> > > > > > > >> > > > > > As
>> > > > > > > >> > > > > > > you pointed out, we don't need to store the
>> > > partition
>> > > > > > epoch
>> > > > > > > in
>> > > > > > > >> > the
>> > > > > > > >> > > > > > message.
>> > > > > > > >> > > > > > > The following is what I am thinking. When a
>> > > partition
>> > > > is
>> > > > > > > >> created,
>> > > > > > > >> > > we
>> > > > > > > >> > > > > can
>> > > > > > > >> > > > > > > assign a partition epoch from an
>> ever-increasing
>> > > > global
>> > > > > > > >> counter
>> > > > > > > >> > and
>> > > > > > > >> > > > > store
>> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/
>> > > > partitions/[partitionId]
>> > > > > in
>> > > > > > > ZK.
>> > > > > > > >> > The
>> > > > > > > >> > > > > > > partition
>> > > > > > > >> > > > > > > epoch is propagated to every broker. The
>> consumer
>> > > will
>> > > > > be
>> > > > > > > >> > tracking
>> > > > > > > >> > > a
>> > > > > > > >> > > > > > tuple
>> > > > > > > >> > > > > > > of <offset, leader epoch, partition epoch> for
>> > > > offsets.
>> > > > > > If a
>> > > > > > > >> > topic
>> > > > > > > >> > > is
>> > > > > > > >> > > > > > > recreated, it's possible that a consumer's
>> offset
>> > > and
>> > > > > > leader
>> > > > > > > >> > epoch
>> > > > > > > >> > > > > still
>> > > > > > > >> > > > > > > match that in the broker, but partition epoch
>> > won't
>> > > > be.
>> > > > > In
>> > > > > > > >> this
>> > > > > > > >> > > case,
>> > > > > > > >> > > > > we
>> > > > > > > >> > > > > > > can potentially still treat the consumer's
>> offset
>> > as
>> > > > out
>> > > > > > of
>> > > > > > > >> range
>> > > > > > > >> > > and
>> > > > > > > >> > > > > > reset
>> > > > > > > >> > > > > > > the offset based on the offset reset policy in
>> the
>> > > > > > consumer.
>> > > > > > > >> This
>> > > > > > > >> > > > seems
>> > > > > > > >> > > > > > > harder to do with a global metadata version.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Jun
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
>> > > > > > > >> lindon...@gmail.com>
>> > > > > > > >> > > > wrote:
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > > Hey Jun,
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > This is a very good example. After thinking
>> > > through
>> > > > > this
>> > > > > > > in
>> > > > > > > >> > > > detail, I
>> > > > > > > >> > > > > > > agree
>> > > > > > > >> > > > > > > > that we need to commit offset with leader
>> epoch
>> > in
>> > > > > order
>> > > > > > > to
>> > > > > > > >> > > address
>> > > > > > > >> > > > > > this
>> > > > > > > >> > > > > > > > example.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > I think the remaining question is how to
>> address
>> > > the
>> > > > > > > >> scenario
>> > > > > > > >> > > that
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > topic is deleted and re-created. One possible
>> > > > solution
>> > > > > > is
>> > > > > > > to
>> > > > > > > >> > > commit
>> > > > > > > >> > > > > > > offset
>> > > > > > > >> > > > > > > > with both the leader epoch and the metadata
>> > > version.
>> > > > > The
>> > > > > > > >> logic
>> > > > > > > >> > > and
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > implementation of this solution does not
>> > require a
>> > > > new
>> > > > > > > >> concept
>> > > > > > > >> > > > (e.g.
>> > > > > > > >> > > > > > > > partition epoch) and it does not require any
>> > > change
>> > > > to
>> > > > > > the
>> > > > > > > >> > > message
>> > > > > > > >> > > > > > format
>> > > > > > > >> > > > > > > > or leader epoch. It also allows us to order
>> the
>> > > > > metadata
>> > > > > > > in
>> > > > > > > >> a
>> > > > > > > >> > > > > > > > straightforward manner which may be useful in
>> > the
>> > > > > > future.
>> > > > > > > >> So it
>> > > > > > > >> > > may
>> > > > > > > >> > > > > be
>> > > > > > > >> > > > > > a
>> > > > > > > >> > > > > > > > better solution than generating a random
>> > partition
>> > > > > epoch
>> > > > > > > >> every
>> > > > > > > >> > > time
>> > > > > > > >> > > > > we
>> > > > > > > >> > > > > > > > create a partition. Does this sound
>> reasonable?
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > Previously one concern with using the
>> metadata
>> > > > version
>> > > > > > is
>> > > > > > > >> that
>> > > > > > > >> > > > > consumer
>> > > > > > > >> > > > > > > > will be forced to refresh metadata even if
>> > > metadata
>> > > > > > > version
>> > > > > > > >> is
>> > > > > > > >> > > > > > increased
>> > > > > > > >> > > > > > > > due to topics that the consumer is not
>> > interested
>> > > > in.
>> > > > > > Now
>> > > > > > > I
>> > > > > > > >> > > > realized
>> > > > > > > >> > > > > > that
>> > > > > > > >> > > > > > > > this is probably not a problem. Currently
>> client
>> > > > will
>> > > > > > > >> refresh
>> > > > > > > >> > > > > metadata
>> > > > > > > >> > > > > > > > either due to InvalidMetadataException in the
>> > > > response
>> > > > > > > from
>> > > > > > > >> > > broker
>> > > > > > > >> > > > or
>> > > > > > > >> > > > > > due
>> > > > > > > >> > > > > > > > to metadata expiry. The addition of the
>> metadata
>> > > > > version
>> > > > > > > >> should
>> > > > > > > >> > > > > > increase
>> > > > > > > >> > > > > > > > the overhead of metadata refresh caused by
>> > > > > > > >> > > > InvalidMetadataException.
>> > > > > > > >> > > > > If
>> > > > > > > >> > > > > > > > client refresh metadata due to expiry and it
>> > > > receives
>> > > > > a
>> > > > > > > >> > metadata
>> > > > > > > >> > > > > whose
>> > > > > > > >> > > > > > > > version is lower than the current metadata
>> > > version,
>> > > > we
>> > > > > > can
>> > > > > > > >> > reject
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > > > metadata but still reset the metadata age,
>> which
>> > > > > > > essentially
>> > > > > > > >> > keep
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > > > existing behavior in the client.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > Thanks much,
>> > > > > > > >> > > > > > > > Dong
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to