Hi, Dong, The current KIP looks good to me.
Thanks, Jun On Tue, Jan 23, 2018 at 12:29 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Do you think the current KIP looks OK? I am wondering if we can open the > voting thread. > > Thanks! > Dong > > On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > I think we can probably have a static method in Util class to decode the > > byte[]. Both KafkaConsumer implementation and the user application will > be > > able to decode the byte array and log its content for debug purpose. So > it > > seems that we can still print the information we want. It is just not > > explicitly exposed in the consumer interface. Would this address the > > problem here? > > > > Yeah we can include OffsetEpoch in AdminClient. This can be added in > > KIP-222? Is there something you would like me to add in this KIP? > > > > Thanks! > > Dong > > > > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote: > > > >> Hi, Dong, > >> > >> The issue with using just byte[] for OffsetEpoch is that it won't be > >> printable, which makes debugging harder. > >> > >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If > that > >> gets adopted before this KIP, we probably want to include OffsetEpoch in > >> the AdminClient too. > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com> wrote: > >> > >> > Hey Jun, > >> > > >> > I agree. I have updated the KIP to remove the class OffetEpoch and > >> replace > >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it looks > >> good? > >> > > >> > Thanks! > >> > Dong > >> > > >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote: > >> > > >> > > Hi, Dong, > >> > > > >> > > Thanks for the updated KIP. It looks good to me now. The only thing > is > >> > > for OffsetEpoch. > >> > > If we expose the individual fields in the class, we probably don't > >> need > >> > the > >> > > encode/decode methods. If we want to hide the details of > OffsetEpoch, > >> we > >> > > probably don't want expose the individual fields. > >> > > > >> > > Jun > >> > > > >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > >> > > > Thinking about point 61 more, I realize that the async zookeeper > >> read > >> > may > >> > > > make it less of an issue for controller to read more zookeeper > >> nodes. > >> > > > Writing partition_epoch in the per-partition znode makes it > simpler > >> to > >> > > > handle the broker failure between zookeeper writes for a topic > >> > creation. > >> > > I > >> > > > have updated the KIP to use the suggested approach. > >> > > > > >> > > > > >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > > >> > > > > Hey Jun, > >> > > > > > >> > > > > Thanks much for the comments. Please see my comments inline. > >> > > > > > >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> > >> wrote: > >> > > > > > >> > > > >> Hi, Dong, > >> > > > >> > >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just a > few > >> > minor > >> > > > >> comments. > >> > > > >> > >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition > >> > > partition): > >> > > > >> It > >> > > > >> seems that there is no need to return metadata. We probably > want > >> to > >> > > > return > >> > > > >> sth like OffsetAndEpoch. > >> > > > >> > >> > > > > > >> > > > > Previously I think we may want to re-use the existing class to > >> keep > >> > our > >> > > > > consumer interface simpler. I have updated the KIP to add class > >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user > may > >> > > > confuse > >> > > > > this name with OffsetEpoch. Does this sound OK? > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 61. Should we store partition_epoch in > >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK? > >> > > > >> > >> > > > > > >> > > > > I have considered this. I think the advantage of adding the > >> > > > > partition->partition_epoch map in the existing > >> > > > > znode /brokers/topics/[topic]/partitions is that controller > only > >> > needs > >> > > > to > >> > > > > read one znode per topic to gets its partition_epoch > information. > >> > > > Otherwise > >> > > > > controller may need to read one extra znode per partition to get > >> the > >> > > same > >> > > > > information. > >> > > > > > >> > > > > When we delete partition or expand partition of a topic, someone > >> > needs > >> > > to > >> > > > > modify partition->partition_epoch map in znode > >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more > >> > > complicated > >> > > > > than simply adding or deleting znode /brokers/topics/[topic]/ > >> > > > partitions/[partitionId]. > >> > > > > But the complexity is probably similar to the existing operation > >> of > >> > > > > modifying the partition->replica_list mapping in znode > >> > > > > /brokers/topics/[topic]. So not sure it is better to store the > >> > > > > partition_epoch in /brokers/topics/[topic]/partit > >> ions/[partitionId]. > >> > > > What > >> > > > > do you think? > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 62. For checking outdated metadata in the client, we probably > >> want > >> > to > >> > > > add > >> > > > >> when max_partition_epoch will be used. > >> > > > >> > >> > > > > > >> > > > > The max_partition_epoch is used in the Proposed Changes -> > >> Client's > >> > > > > metadata refresh section to determine whether a metadata is > >> outdated. > >> > > And > >> > > > > this formula is referenced and re-used in other sections to > >> determine > >> > > > > whether a metadata is outdated. Does this formula look OK? > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of > >> messages > >> > > > whose > >> > > > >> offset < the commit offset. If no message has been consumed > since > >> > > > consumer > >> > > > >> initialization, the leader_epoch from seek(...) or > >> > OffsetFetchResponse > >> > > > >> should be used. The partition_epoch should be read from the > last > >> > > > >> FetchResponse corresponding to the given partition and commit > >> > offset. > >> > > ": > >> > > > >> leader_epoch and partition_epoch are associated with an offset. > >> So, > >> > if > >> > > > no > >> > > > >> message is consumed, there is no offset and therefore there is > no > >> > need > >> > > > to > >> > > > >> read leader_epoch and partition_epoch. Also, the leader_epoch > >> > > associated > >> > > > >> with the offset should just come from the messages returned in > >> the > >> > > fetch > >> > > > >> response. > >> > > > >> > >> > > > > > >> > > > > I am thinking that, if user calls seek(..) and commitSync(...) > >> > without > >> > > > > consuming any messages, we should re-use the leader_epoch and > >> > > > > partition_epoch provided by the seek(...) in the > >> OffsetCommitRequest. > >> > > And > >> > > > > if messages have been successfully consumed, then leader_epoch > >> will > >> > > come > >> > > > > from the messages returned in the fetch response. The condition > >> > > "messages > >> > > > > whose offset < the commit offset" is needed to take care of the > >> log > >> > > > > compacted topic which may have offset gap due to log cleaning. > >> > > > > > >> > > > > Did I miss something here? Or should I rephrase the paragraph to > >> make > >> > > it > >> > > > > less confusing? > >> > > > > > >> > > > > > >> > > > >> 64. Could you include the public methods in the OffsetEpoch > >> class? > >> > > > >> > >> > > > > > >> > > > > I mistakenly deleted the definition of OffsetEpoch class from > the > >> > KIP. > >> > > I > >> > > > > just added it back with the public methods. Could you take > another > >> > > look? > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> Jun > >> > > > >> > >> > > > >> > >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com > > > >> > > wrote: > >> > > > >> > >> > > > >> > Hey Jun, > >> > > > >> > > >> > > > >> > Thanks much. I agree that we can not rely on committed > offsets > >> to > >> > be > >> > > > >> always > >> > > > >> > deleted when we delete topic. So it is necessary to use a > >> > > > per-partition > >> > > > >> > epoch that does not change unless this partition is deleted. > I > >> > also > >> > > > >> agree > >> > > > >> > that it is very nice to be able to uniquely identify a > message > >> > with > >> > > > >> > (offset, leader_epoch, partition_epoch) in face of potential > >> topic > >> > > > >> deletion > >> > > > >> > and unclean leader election. > >> > > > >> > > >> > > > >> > I agree with all your comments. And I have updated the KIP > >> based > >> > on > >> > > > our > >> > > > >> > latest discussion. In addition, I added > >> > > InvalidPartitionEpochException > >> > > > >> > which will be thrown by consumer.poll() if the > partition_epoch > >> > > > >> associated > >> > > > >> > with the partition, which can be given to consumer using > >> > seek(...), > >> > > is > >> > > > >> > different from the partition_epoch in the FetchResponse. > >> > > > >> > > >> > > > >> > Can you take another look at the latest KIP? > >> > > > >> > > >> > > > >> > Thanks! > >> > > > >> > Dong > >> > > > >> > > >> > > > >> > > >> > > > >> > > >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> > >> > wrote: > >> > > > >> > > >> > > > >> > > Hi, Dong, > >> > > > >> > > > >> > > > >> > > My replies are the following. > >> > > > >> > > > >> > > > >> > > 60. What you described could also work. The drawback is > that > >> we > >> > > will > >> > > > >> be > >> > > > >> > > unnecessarily changing the partition epoch when a partition > >> > hasn't > >> > > > >> really > >> > > > >> > > changed. I was imagining that the partition epoch will be > >> stored > >> > > in > >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId], instead > >> of at > >> > > the > >> > > > >> > topic > >> > > > >> > > level. So, not sure if ZK size limit is an issue. > >> > > > >> > > > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique > >> > > > identifier > >> > > > >> > for > >> > > > >> > > a message. So, if a message hasn't changed, the offset and > >> the > >> > > > >> associated > >> > > > >> > > offset_epoch ideally should remain the same (it will be > kind > >> of > >> > > > weird > >> > > > >> if > >> > > > >> > > two consumer apps save the offset on the same message, but > >> the > >> > > > >> > offset_epoch > >> > > > >> > > are different). partition_epoch + leader_epoch give us > that. > >> > > > >> > global_epoch + > >> > > > >> > > leader_epoch don't. If we use this approach, we can solve > not > >> > only > >> > > > the > >> > > > >> > > problem that you have identified, but also other problems > >> when > >> > > there > >> > > > >> is > >> > > > >> > > data loss or topic re-creation more reliably. For example, > in > >> > the > >> > > > >> future, > >> > > > >> > > if we include the partition_epoch and leader_epoch in the > >> fetch > >> > > > >> request, > >> > > > >> > > the server can do a more reliable check of whether that > >> offset > >> > is > >> > > > >> valid > >> > > > >> > or > >> > > > >> > > not. I am not sure that we can rely upon all external > >> offsets to > >> > > be > >> > > > >> > removed > >> > > > >> > > on topic deletion. For example, a topic may be deleted by > an > >> > admin > >> > > > who > >> > > > >> > may > >> > > > >> > > not know all the applications. > >> > > > >> > > > >> > > > >> > > If we agree on the above, the second question is then how > to > >> > > > reliably > >> > > > >> > > propagate the partition_epoch and the leader_epoch to the > >> > consumer > >> > > > >> when > >> > > > >> > > there are leader or partition changes. The leader_epoch > comes > >> > from > >> > > > the > >> > > > >> > > message, which is reliable. So, I was suggesting that when > we > >> > > store > >> > > > an > >> > > > >> > > offset, we can just store the leader_epoch from the message > >> set > >> > > > >> > containing > >> > > > >> > > that offset. Similarly, I was thinking that if the > >> > partition_epoch > >> > > > is > >> > > > >> in > >> > > > >> > > the fetch response, we can propagate partition_epoch > reliably > >> > > where > >> > > > is > >> > > > >> > > partition_epoch change. > >> > > > >> > > > >> > > > >> > > 63. My point is that once a leader is producing a message > in > >> the > >> > > new > >> > > > >> > > partition_epoch, ideally, we should associate the new > offsets > >> > with > >> > > > the > >> > > > >> > new > >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be the > >> > correct > >> > > > >> unique > >> > > > >> > > identifier (useful for solving other problems mentioned > >> above). > >> > I > >> > > > was > >> > > > >> > > originally thinking that the leader will include the > >> > > partition_epoch > >> > > > >> in > >> > > > >> > the > >> > > > >> > > metadata cache in the fetch response. It's just that right > >> now, > >> > > > >> metadata > >> > > > >> > > cache is updated on UpdateMetadataRequest, which typically > >> > happens > >> > > > >> after > >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the leader > >> to > >> > > cache > >> > > > >> the > >> > > > >> > > partition_epoch in the Partition object and return that > >> (instead > >> > > of > >> > > > >> the > >> > > > >> > one > >> > > > >> > > in metadata cache) in the fetch response. > >> > > > >> > > > >> > > > >> > > 65. It seems to me that the global_epoch and the > >> partition_epoch > >> > > > have > >> > > > >> > > different purposes. A partition_epoch has the benefit that > it > >> > (1) > >> > > > can > >> > > > >> be > >> > > > >> > > used to form a unique identifier for a message and (2) can > be > >> > used > >> > > > to > >> > > > >> > > solve other > >> > > > >> > > corner case problems in the future. I am not sure having > >> just a > >> > > > >> > > global_epoch can achieve these. global_epoch is useful to > >> > > determine > >> > > > >> which > >> > > > >> > > version of the metadata is newer, especially with topic > >> > deletion. > >> > > > >> > > > >> > > > >> > > Thanks, > >> > > > >> > > > >> > > > >> > > Jun > >> > > > >> > > > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin < > >> lindon...@gmail.com> > >> > > > >> wrote: > >> > > > >> > > > >> > > > >> > > > Regarding the use of the global epoch in 65), it is very > >> > similar > >> > > > to > >> > > > >> the > >> > > > >> > > > proposal of the metadata_epoch we discussed earlier. The > >> main > >> > > > >> > difference > >> > > > >> > > is > >> > > > >> > > > that this epoch is incremented when we > create/expand/delete > >> > > topic > >> > > > >> and > >> > > > >> > > does > >> > > > >> > > > not change when controller re-send metadata. > >> > > > >> > > > > >> > > > >> > > > I looked at our previous discussion. It seems that we > >> prefer > >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we > >> prefer > >> > not > >> > > > to > >> > > > >> > have > >> > > > >> > > an > >> > > > >> > > > ever growing metadata_epoch and 2) we can reset offset > >> better > >> > > when > >> > > > >> > topic > >> > > > >> > > is > >> > > > >> > > > re-created. The use of global topic_epoch avoids the > >> drawback > >> > of > >> > > > an > >> > > > >> > ever > >> > > > >> > > > quickly ever growing metadata_epoch. Though the global > >> epoch > >> > > does > >> > > > >> not > >> > > > >> > > allow > >> > > > >> > > > us to recognize the invalid offset committed before the > >> topic > >> > > > >> > > re-creation, > >> > > > >> > > > we can probably just delete the offset when we delete a > >> topic. > >> > > > Thus > >> > > > >> I > >> > > > >> > am > >> > > > >> > > > not very sure whether it is still worthwhile to have a > >> > > > per-partition > >> > > > >> > > > partition_epoch if the metadata already has the global > >> epoch. > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin < > >> lindon...@gmail.com > >> > > > >> > > > >> wrote: > >> > > > >> > > > > >> > > > >> > > > > Hey Jun, > >> > > > >> > > > > > >> > > > >> > > > > Thanks so much. These comments very useful. Please see > >> below > >> > > my > >> > > > >> > > comments. > >> > > > >> > > > > > >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao < > >> j...@confluent.io> > >> > > > wrote: > >> > > > >> > > > > > >> > > > >> > > > >> Hi, Dong, > >> > > > >> > > > >> > >> > > > >> > > > >> Thanks for the updated KIP. A few more comments. > >> > > > >> > > > >> > >> > > > >> > > > >> 60. Perhaps having a partition epoch is more flexible > >> since > >> > > in > >> > > > >> the > >> > > > >> > > > future, > >> > > > >> > > > >> we may support deleting a partition as well. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > Yeah I have considered this. I think we can probably > >> still > >> > > > support > >> > > > >> > > > > deleting a partition by using the topic_epoch -- when > >> > > partition > >> > > > >> of a > >> > > > >> > > > topic > >> > > > >> > > > > is deleted or created, epoch of all partitions of this > >> topic > >> > > > will > >> > > > >> be > >> > > > >> > > > > incremented by 1. Therefore, if that partition is > >> re-created > >> > > > >> later, > >> > > > >> > the > >> > > > >> > > > > epoch of that partition will still be larger than its > >> epoch > >> > > > before > >> > > > >> > the > >> > > > >> > > > > deletion, which still allows the client to order the > >> > metadata > >> > > > for > >> > > > >> the > >> > > > >> > > > > purpose of this KIP. Does this sound reasonable? > >> > > > >> > > > > > >> > > > >> > > > > The advantage of using topic_epoch instead of > >> > partition_epoch > >> > > is > >> > > > >> that > >> > > > >> > > the > >> > > > >> > > > > size of the /brokers/topics/[topic] znode and > >> > request/response > >> > > > >> size > >> > > > >> > can > >> > > > >> > > > be > >> > > > >> > > > > smaller. We have a limit on the maximum size of znode > >> > > (typically > >> > > > >> > 1MB). > >> > > > >> > > > Use > >> > > > >> > > > > partition epoch can effectively reduce the number of > >> > > partitions > >> > > > >> that > >> > > > >> > > can > >> > > > >> > > > be > >> > > > >> > > > > described by the /brokers/topics/[topic] znode. > >> > > > >> > > > > > >> > > > >> > > > > One use-case of partition_epoch for client to detect > that > >> > the > >> > > > >> > committed > >> > > > >> > > > > offset, either from kafka offset topic or from the > >> external > >> > > > store > >> > > > >> is > >> > > > >> > > > > invalid after partition deletion and re-creation. > >> However, > >> > it > >> > > > >> seems > >> > > > >> > > that > >> > > > >> > > > we > >> > > > >> > > > > can also address this use-case with other approaches. > For > >> > > > example, > >> > > > >> > when > >> > > > >> > > > > AdminClient deletes partitions, it can also delete the > >> > > committed > >> > > > >> > > offsets > >> > > > >> > > > > for those partitions from the offset topic. If user > >> stores > >> > > > offset > >> > > > >> > > > > externally, it might make sense for user to similarly > >> remove > >> > > > >> offsets > >> > > > >> > of > >> > > > >> > > > > related partitions after these partitions are deleted. > >> So I > >> > am > >> > > > not > >> > > > >> > sure > >> > > > >> > > > > that we should use partition_epoch in this KIP. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> > >> > > > >> > > > >> 61. It seems that the leader epoch returned in the > >> > position() > >> > > > >> call > >> > > > >> > > > should > >> > > > >> > > > >> the the leader epoch returned in the fetch response, > not > >> > the > >> > > > one > >> > > > >> in > >> > > > >> > > the > >> > > > >> > > > >> metadata cache of the client. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > I think this is a good idea. Just to double check, this > >> > change > >> > > > >> does > >> > > > >> > not > >> > > > >> > > > > affect the correctness or performance of this KIP. But > it > >> > can > >> > > be > >> > > > >> > useful > >> > > > >> > > > if > >> > > > >> > > > > we want to use the leader_epoch to better handle the > >> offset > >> > > rest > >> > > > >> in > >> > > > >> > > case > >> > > > >> > > > of > >> > > > >> > > > > unclean leader election, which is listed in the future > >> work. > >> > > Is > >> > > > >> this > >> > > > >> > > > > understanding correct? > >> > > > >> > > > > > >> > > > >> > > > > I have updated the KIP to specify that the leader_epoch > >> > > returned > >> > > > >> by > >> > > > >> > > > > position() should be the largest leader_epoch of those > >> > already > >> > > > >> > consumed > >> > > > >> > > > > messages whose offset < position. If no message has > been > >> > > > consumed > >> > > > >> > since > >> > > > >> > > > > consumer initialization, the leader_epoch from seek() > or > >> > > > >> > > > > OffsetFetchResponse should be used. The offset included > >> in > >> > the > >> > > > >> > > > > OffsetCommitRequest will also be determined in the > >> similar > >> > > > manner. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> > >> > > > >> > > > >> 62. I am wondering if we should return the partition > >> epoch > >> > in > >> > > > the > >> > > > >> > > fetch > >> > > > >> > > > >> response as well. In the current proposal, if a topic > is > >> > > > >> recreated > >> > > > >> > and > >> > > > >> > > > the > >> > > > >> > > > >> new leader is on the same broker as the old one, there > >> is > >> > > > >> nothing to > >> > > > >> > > > force > >> > > > >> > > > >> the metadata refresh in the client. So, the client may > >> > still > >> > > > >> > associate > >> > > > >> > > > the > >> > > > >> > > > >> offset with the old partition epoch. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > Could you help me understand the problem if a client > >> > > associates > >> > > > >> old > >> > > > >> > > > > partition_epoch (or the topic_epoch as of the current > >> KIP) > >> > > with > >> > > > >> the > >> > > > >> > > > offset? > >> > > > >> > > > > The main purpose of the topic_epoch is to be able to > drop > >> > > > >> > leader_epoch > >> > > > >> > > > to 0 > >> > > > >> > > > > after a partition is deleted and re-created. I guess > you > >> may > >> > > be > >> > > > >> > > thinking > >> > > > >> > > > > about using the partition_epoch to detect that the > >> committed > >> > > > >> offset > >> > > > >> > is > >> > > > >> > > > > invalid? In that case, I am wondering if the > alternative > >> > > > approach > >> > > > >> > > > described > >> > > > >> > > > > in 60) would be reasonable. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> > >> > > > >> > > > >> 63. There is some subtle coordination between the > >> > > > >> > LeaderAndIsrRequest > >> > > > >> > > > and > >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader > changes, > >> > the > >> > > > >> > > controller > >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the assigned > >> > replicas > >> > > > and > >> > > > >> the > >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there could > >> be a > >> > > > small > >> > > > >> > > window > >> > > > >> > > > >> when the leader already receives the new partition > >> epoch in > >> > > the > >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the > >> broker > >> > > > hasn't > >> > > > >> > been > >> > > > >> > > > >> updated with the latest partition epoch. Not sure > what's > >> > the > >> > > > best > >> > > > >> > way > >> > > > >> > > to > >> > > > >> > > > >> address this issue. Perhaps we can update the metadata > >> > cache > >> > > on > >> > > > >> the > >> > > > >> > > > broker > >> > > > >> > > > >> with both LeaderAndIsrRequest and > UpdateMetadataRequest. > >> > The > >> > > > >> > challenge > >> > > > >> > > > is > >> > > > >> > > > >> that the two have slightly different data. For > example, > >> > only > >> > > > the > >> > > > >> > > latter > >> > > > >> > > > >> has > >> > > > >> > > > >> all endpoints. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > I am not sure whether this is a problem. Could you > >> explain a > >> > > bit > >> > > > >> more > >> > > > >> > > > what > >> > > > >> > > > > specific problem this small window can cause? > >> > > > >> > > > > > >> > > > >> > > > > Since client can fetch metadata from any broker in the > >> > > cluster, > >> > > > >> and > >> > > > >> > > given > >> > > > >> > > > > that different brokers receive request (e.g. > >> > > LeaderAndIsrRequest > >> > > > >> and > >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the metadata > >> > > received > >> > > > >> by > >> > > > >> > > > client > >> > > > >> > > > > can be in arbitrary order (either newer or older) > >> compared > >> > to > >> > > > the > >> > > > >> > > > broker's > >> > > > >> > > > > leadership state even if a given broker receives > >> > > > >> LeaderAndIsrRequest > >> > > > >> > > and > >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not sure > >> it is > >> > > > >> useful > >> > > > >> > to > >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset commit: > We > >> > > allow > >> > > > a > >> > > > >> > > > consumer > >> > > > >> > > > >> to set an arbitrary offset. So it's possible for > >> offsets or > >> > > > >> leader > >> > > > >> > > epoch > >> > > > >> > > > >> to > >> > > > >> > > > >> go backwards. I am not sure if we could always enforce > >> that > >> > > the > >> > > > >> > leader > >> > > > >> > > > >> epoch only goes up on the broker. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > Sure. I have removed this check from the KIP. > >> > > > >> > > > > > >> > > > >> > > > > BTW, we can probably still ensure that the leader_epoch > >> > always > >> > > > >> > increase > >> > > > >> > > > if > >> > > > >> > > > > the leader_epoch used with offset commit is the > >> > > max(leader_epoch > >> > > > >> of > >> > > > >> > the > >> > > > >> > > > > message with offset = the committed offset - 1, the > >> largest > >> > > > known > >> > > > >> > > > > leader_epoch from the metadata). But I don't have a > good > >> > > > use-case > >> > > > >> for > >> > > > >> > > > this > >> > > > >> > > > > alternative definition. So I choose the keep the KIP > >> simple > >> > by > >> > > > >> > > requiring > >> > > > >> > > > > leader_epoch to always increase. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> 65. Good point on handling missing partition epoch due > >> to > >> > > topic > >> > > > >> > > > deletion. > >> > > > >> > > > >> Another potential way to address this is to > additionally > >> > > > >> propagate > >> > > > >> > the > >> > > > >> > > > >> global partition epoch to brokers and the clients. > This > >> > way, > >> > > > >> when a > >> > > > >> > > > >> partition epoch is missing, we can use the global > >> partition > >> > > > >> epoch to > >> > > > >> > > > >> reason > >> > > > >> > > > >> about which metadata is more recent. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > This is a great idea. The global epoch can be used to > >> order > >> > > the > >> > > > >> > > metadata > >> > > > >> > > > > and help us recognize the more recent metadata if a > topic > >> > (or > >> > > > >> > > partition) > >> > > > >> > > > is > >> > > > >> > > > > deleted and re-created. > >> > > > >> > > > > > >> > > > >> > > > > Actually, it seems we only need to propagate the global > >> > epoch > >> > > to > >> > > > >> > > brokers > >> > > > >> > > > > and clients without propagating this epoch on a > >> per-topic or > >> > > > >> > > > per-partition > >> > > > >> > > > > basic. Doing so would simply interface changes made > this > >> > KIP. > >> > > > Does > >> > > > >> > this > >> > > > >> > > > > approach sound reasonable? > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> 66. A client may also get an offset by time using the > >> > > > >> > offsetForTimes() > >> > > > >> > > > >> api. > >> > > > >> > > > >> So, we probably want to include offsetInternalMetadata > >> in > >> > > > >> > > > >> OffsetAndTimestamp > >> > > > >> > > > >> as well. > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > You are right. This probably also requires us to change > >> the > >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP after > we > >> > > agree > >> > > > on > >> > > > >> > the > >> > > > >> > > > > solution for 65). > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> > >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with the > >> > metadata > >> > > > >> field > >> > > > >> > > > already > >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It > >> might be > >> > > > >> useful > >> > > > >> > to > >> > > > >> > > > make > >> > > > >> > > > >> OffsetEpoch printable at least for debugging purpose. > >> Once > >> > > you > >> > > > do > >> > > > >> > > that, > >> > > > >> > > > we > >> > > > >> > > > >> are already exposing the internal fields. So, not sure > >> if > >> > > it's > >> > > > >> worth > >> > > > >> > > > >> hiding > >> > > > >> > > > >> them. If we do want to hide them, perhaps we can have > >> sth > >> > > like > >> > > > >> the > >> > > > >> > > > >> following. The binary encoding is probably more > >> efficient > >> > > than > >> > > > >> JSON > >> > > > >> > > for > >> > > > >> > > > >> external storage. > >> > > > >> > > > >> > >> > > > >> > > > >> OffsetEpoch { > >> > > > >> > > > >> static OffsetEpoch decode(byte[]); > >> > > > >> > > > >> > >> > > > >> > > > >> public byte[] encode(); > >> > > > >> > > > >> > >> > > > >> > > > >> public String toString(); > >> > > > >> > > > >> } > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > Thanks much. I like this solution. I have updated the > KIP > >> > > > >> > accordingly. > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > >> > >> > > > >> > > > >> Jun > >> > > > >> > > > >> > >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin < > >> > > lindon...@gmail.com> > >> > > > >> > wrote: > >> > > > >> > > > >> > >> > > > >> > > > >> > Hey Jason, > >> > > > >> > > > >> > > >> > > > >> > > > >> > Certainly. This sounds good. I have updated the KIP > to > >> > > > clarity > >> > > > >> > that > >> > > > >> > > > the > >> > > > >> > > > >> > global epoch will be incremented by 1 each time a > >> topic > >> > is > >> > > > >> > deleted. > >> > > > >> > > > >> > > >> > > > >> > > > >> > Thanks, > >> > > > >> > > > >> > Dong > >> > > > >> > > > >> > > >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson < > >> > > > >> > ja...@confluent.io > >> > > > >> > > > > >> > > > >> > > > >> > wrote: > >> > > > >> > > > >> > > >> > > > >> > > > >> > > Hi Dong, > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > I think your approach will allow user to > distinguish > >> > > > between > >> > > > >> the > >> > > > >> > > > >> metadata > >> > > > >> > > > >> > > > before and after the topic deletion. I also > agree > >> > that > >> > > > this > >> > > > >> > can > >> > > > >> > > be > >> > > > >> > > > >> > > > potentially be useful to user. I am just not > very > >> > sure > >> > > > >> whether > >> > > > >> > > we > >> > > > >> > > > >> > already > >> > > > >> > > > >> > > > have a good use-case to make the additional > >> > complexity > >> > > > >> > > worthwhile. > >> > > > >> > > > >> It > >> > > > >> > > > >> > > seems > >> > > > >> > > > >> > > > that this feature is kind of independent of the > >> main > >> > > > >> problem > >> > > > >> > of > >> > > > >> > > > this > >> > > > >> > > > >> > KIP. > >> > > > >> > > > >> > > > Could we add this as a future work? > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic epoch > on > >> > > > deletion > >> > > > >> > and > >> > > > >> > > > >> leave > >> > > > >> > > > >> > > propagation of the epoch for deleted topics for > >> future > >> > > > work? > >> > > > >> I > >> > > > >> > > don't > >> > > > >> > > > >> > think > >> > > > >> > > > >> > > this adds much complexity and it makes the > behavior > >> > > > >> consistent: > >> > > > >> > > > every > >> > > > >> > > > >> > topic > >> > > > >> > > > >> > > mutation results in an epoch bump. > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > Thanks, > >> > > > >> > > > >> > > Jason > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin < > >> > > > >> lindon...@gmail.com> > >> > > > >> > > > wrote: > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > Hey Ismael, > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > I guess we actually need user to see this field > so > >> > that > >> > > > >> user > >> > > > >> > can > >> > > > >> > > > >> store > >> > > > >> > > > >> > > this > >> > > > >> > > > >> > > > value in the external store together with the > >> offset. > >> > > We > >> > > > >> just > >> > > > >> > > > prefer > >> > > > >> > > > >> > the > >> > > > >> > > > >> > > > value to be opaque to discourage most users from > >> > > > >> interpreting > >> > > > >> > > this > >> > > > >> > > > >> > value. > >> > > > >> > > > >> > > > One more advantage of using such an opaque field > >> is > >> > to > >> > > be > >> > > > >> able > >> > > > >> > > to > >> > > > >> > > > >> > evolve > >> > > > >> > > > >> > > > the information (or schema) of this value > without > >> > > > changing > >> > > > >> > > > consumer > >> > > > >> > > > >> API > >> > > > >> > > > >> > > in > >> > > > >> > > > >> > > > the future. > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > I also thinking it is probably OK for user to be > >> able > >> > > to > >> > > > >> > > interpret > >> > > > >> > > > >> this > >> > > > >> > > > >> > > > value, particularly for those advanced users. > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > Thanks, > >> > > > >> > > > >> > > > Dong > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma < > >> > > > >> > ism...@juma.me.uk> > >> > > > >> > > > >> wrote: > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason > Gustafson > >> < > >> > > > >> > > > >> ja...@confluent.io> > >> > > > >> > > > >> > > > > wrote: > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > class OffsetAndMetadata { > >> > > > >> > > > >> > > > > > long offset; > >> > > > >> > > > >> > > > > > byte[] offsetMetadata; > >> > > > >> > > > >> > > > > > String metadata; > >> > > > >> > > > >> > > > > > } > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying, > but > >> we > >> > > can > >> > > > >> > > probably > >> > > > >> > > > >> come > >> > > > >> > > > >> > up > >> > > > >> > > > >> > > > > with > >> > > > >> > > > >> > > > > > something better. Internally the byte array > >> would > >> > > > have > >> > > > >> a > >> > > > >> > > > >> version. > >> > > > >> > > > >> > If > >> > > > >> > > > >> > > in > >> > > > >> > > > >> > > > > the > >> > > > >> > > > >> > > > > > future we have anything else we need to add, > >> we > >> > can > >> > > > >> update > >> > > > >> > > the > >> > > > >> > > > >> > > version > >> > > > >> > > > >> > > > > and > >> > > > >> > > > >> > > > > > we wouldn't need any new APIs. > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > We can also add fields to a class in a > >> compatible > >> > > way. > >> > > > >> So, > >> > > > >> > it > >> > > > >> > > > >> seems > >> > > > >> > > > >> > to > >> > > > >> > > > >> > > me > >> > > > >> > > > >> > > > > that the main advantage of the byte array is > >> that > >> > > it's > >> > > > >> > opaque > >> > > > >> > > to > >> > > > >> > > > >> the > >> > > > >> > > > >> > > > user. > >> > > > >> > > > >> > > > > Is that correct? If so, we could also add any > >> > opaque > >> > > > >> > metadata > >> > > > >> > > > in a > >> > > > >> > > > >> > > > subclass > >> > > > >> > > > >> > > > > so that users don't even see it (unless they > >> cast > >> > it, > >> > > > but > >> > > > >> > then > >> > > > >> > > > >> > they're > >> > > > >> > > > >> > > on > >> > > > >> > > > >> > > > > their own). > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > Ismael > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > The corresponding seek() and position() APIs > >> might > >> > > look > >> > > > >> > > > something > >> > > > >> > > > >> > like > >> > > > >> > > > >> > > > > this: > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long > >> offset, > >> > > > byte[] > >> > > > >> > > > >> > > > offsetMetadata); > >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition > >> > partition); > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > What do you think? > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > Thanks, > >> > > > >> > > > >> > > > > > Jason > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin < > >> > > > >> > > lindon...@gmail.com > >> > > > >> > > > > > >> > > > >> > > > >> > > wrote: > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > Hey Jun, Jason, > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I have > >> > updated > >> > > > the > >> > > > >> KIP > >> > > > >> > > > >> based on > >> > > > >> > > > >> > > the > >> > > > >> > > > >> > > > > > > latest discussion. Can you help check > >> whether > >> > it > >> > > > >> looks > >> > > > >> > > good? > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > Thanks, > >> > > > >> > > > >> > > > > > > Dong > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin < > >> > > > >> > > > lindon...@gmail.com > >> > > > >> > > > >> > > >> > > > >> > > > >> > > > wrote: > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > > Hey Jun, > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am > not > >> > sure > >> > > > that > >> > > > >> > the > >> > > > >> > > > >> > proposed > >> > > > >> > > > >> > > > API > >> > > > >> > > > >> > > > > is > >> > > > >> > > > >> > > > > > > > sufficient. For users that store offset > >> > > > >> externally, we > >> > > > >> > > > >> probably > >> > > > >> > > > >> > > > need > >> > > > >> > > > >> > > > > > > extra > >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and > >> > > > partition_epoch > >> > > > >> for > >> > > > >> > > all > >> > > > >> > > > >> > > > partitions > >> > > > >> > > > >> > > > > > > that > >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose these > >> > users > >> > > > >> > currently > >> > > > >> > > > use > >> > > > >> > > > >> > > > > position() > >> > > > >> > > > >> > > > > > > to > >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably need a > >> new > >> > > > method > >> > > > >> > > > >> > > > > > positionWithEpoch(..) > >> > > > >> > > > >> > > > > > > to > >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch, > >> > leader_epoch>. > >> > > > >> Does > >> > > > >> > > this > >> > > > >> > > > >> sound > >> > > > >> > > > >> > > > > > > reasonable? > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > Thanks, > >> > > > >> > > > >> > > > > > > > Dong > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao > < > >> > > > >> > > j...@confluent.io > >> > > > >> > > > > > >> > > > >> > > > >> > > wrote: > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> Hi, Dong, > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking. > >> OffsetEpoch > >> > > will > >> > > > >> be > >> > > > >> > > > >> composed > >> > > > >> > > > >> > of > >> > > > >> > > > >> > > > > > > >> (partition_epoch, > >> > > > >> > > > >> > > > > > > >> leader_epoch). > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> Thanks, > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> Jun > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong > Lin > >> < > >> > > > >> > > > >> lindon...@gmail.com > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > > wrote: > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > >> > Hey Jun, > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new API > >> that > >> > you > >> > > > >> > > proposed. > >> > > > >> > > > I > >> > > > >> > > > >> am > >> > > > >> > > > >> > > not > >> > > > >> > > > >> > > > > sure > >> > > > >> > > > >> > > > > > > >> what > >> > > > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I > >> > suppose > >> > > > >> that we > >> > > > >> > > can > >> > > > >> > > > >> use > >> > > > >> > > > >> > > the > >> > > > >> > > > >> > > > > pair > >> > > > >> > > > >> > > > > > > of > >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as > the > >> > > > >> > offset_epoch, > >> > > > >> > > > >> right? > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > Thanks, > >> > > > >> > > > >> > > > > > > >> > Dong > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun > >> Rao < > >> > > > >> > > > >> j...@confluent.io> > >> > > > >> > > > >> > > > wrote: > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > > Hi, Dong, > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you proposed > >> works. > >> > > The > >> > > > >> > > question > >> > > > >> > > > >> is > >> > > > >> > > > >> > > > whether > >> > > > >> > > > >> > > > > > > >> that's > >> > > > >> > > > >> > > > > > > >> > the > >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the > long > >> > term. > >> > > > My > >> > > > >> > > concern > >> > > > >> > > > >> is > >> > > > >> > > > >> > > that > >> > > > >> > > > >> > > > > > while > >> > > > >> > > > >> > > > > > > >> the > >> > > > >> > > > >> > > > > > > >> > api > >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api seems > >> > harder > >> > > > to > >> > > > >> > > explain > >> > > > >> > > > >> and > >> > > > >> > > > >> > > use. > >> > > > >> > > > >> > > > > For > >> > > > >> > > > >> > > > > > > >> > example, > >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets > externally > >> > now > >> > > > >> needs > >> > > > >> > to > >> > > > >> > > > call > >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after > calling > >> > > > seek(). > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to make > >> the > >> > > > >> following > >> > > > >> > > > >> > compatible > >> > > > >> > > > >> > > > api > >> > > > >> > > > >> > > > > > > >> changes > >> > > > >> > > > >> > > > > > > >> > in > >> > > > >> > > > >> > > > > > > >> > > Consumer. > >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch > >> field in > >> > > > >> > > > >> > OffsetAndMetadata. > >> > > > >> > > > >> > > > (no > >> > > > >> > > > >> > > > > > need > >> > > > >> > > > >> > > > > > > >> to > >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api) > >> > > > >> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition > >> > > > partition, > >> > > > >> > long > >> > > > >> > > > >> > offset, > >> > > > >> > > > >> > > > > > > >> OffsetEpoch > >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially > >> > deprecate > >> > > > the > >> > > > >> > old > >> > > > >> > > > api > >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition > >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the > >> future. > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has > similar > >> > > amount > >> > > > of > >> > > > >> > api > >> > > > >> > > > >> changes > >> > > > >> > > > >> > > as > >> > > > >> > > > >> > > > > > yours > >> > > > >> > > > >> > > > > > > >> but > >> > > > >> > > > >> > > > > > > >> > has > >> > > > >> > > > >> > > > > > > >> > > the following benefits. > >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar way > as > >> how > >> > > > >> offset > >> > > > >> > > > >> management > >> > > > >> > > > >> > > > works > >> > > > >> > > > >> > > > > > now > >> > > > >> > > > >> > > > > > > >> and > >> > > > >> > > > >> > > > > > > >> > is > >> > > > >> > > > >> > > > > > > >> > > probably what we want in the long > >> term. > >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better when > >> > there > >> > > is > >> > > > >> data > >> > > > >> > > > loss > >> > > > >> > > > >> due > >> > > > >> > > > >> > > to > >> > > > >> > > > >> > > > > > > unclean > >> > > > >> > > > >> > > > > > > >> > > leader election or correlated > replica > >> > > > failure. > >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better when > >> > topic > >> > > is > >> > > > >> > > > recreated. > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > Thanks, > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > Jun > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, > Dong > >> > Lin < > >> > > > >> > > > >> > > lindon...@gmail.com > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > > wrote: > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun, > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we > don't > >> > want > >> > > an > >> > > > >> ever > >> > > > >> > > > >> growing > >> > > > >> > > > >> > > > global > >> > > > >> > > > >> > > > > > > >> metadata > >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may be > >> more > >> > > > >> desirable > >> > > > >> > to > >> > > > >> > > > >> keep > >> > > > >> > > > >> > the > >> > > > >> > > > >> > > > > > > consumer > >> > > > >> > > > >> > > > > > > >> API > >> > > > >> > > > >> > > > > > > >> > > > simple. > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata > >> > version > >> > > > >> > returned > >> > > > >> > > > in > >> > > > >> > > > >> the > >> > > > >> > > > >> > > > fetch > >> > > > >> > > > >> > > > > > > >> response > >> > > > >> > > > >> > > > > > > >> > > > will be stored with the offset > >> > together. > >> > > > >> More > >> > > > >> > > > >> > > specifically, > >> > > > >> > > > >> > > > > the > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new offset > >> topic > >> > > > >> schema > >> > > > >> > > will > >> > > > >> > > > be > >> > > > >> > > > >> > the > >> > > > >> > > > >> > > > > > largest > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the > >> > > > MetadataResponse > >> > > > >> > and > >> > > > >> > > > >> > > > FetchResponse > >> > > > >> > > > >> > > > > > > ever > >> > > > >> > > > >> > > > > > > >> > > > received by this consumer. > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to change > >> the > >> > > > >> consumer > >> > > > >> > API > >> > > > >> > > > for > >> > > > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition, > >> > > > >> > > OffsetAndMetadata>). > >> > > > >> > > > >> If > >> > > > >> > > > >> > > user > >> > > > >> > > > >> > > > > > calls > >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit offset > 10 > >> > for > >> > > a > >> > > > >> given > >> > > > >> > > > >> > partition, > >> > > > >> > > > >> > > > for > >> > > > >> > > > >> > > > > > > most > >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer instance > >> > should > >> > > > >> have > >> > > > >> > > > >> consumed > >> > > > >> > > > >> > > > message > >> > > > >> > > > >> > > > > > > with > >> > > > >> > > > >> > > > > > > >> > > offset > >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in which > >> case > >> > the > >> > > > >> > consumer > >> > > > >> > > > can > >> > > > >> > > > >> > > > remember > >> > > > >> > > > >> > > > > > and > >> > > > >> > > > >> > > > > > > >> use > >> > > > >> > > > >> > > > > > > >> > > the > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from the > >> corresponding > >> > > > >> > > FetchResponse > >> > > > >> > > > >> when > >> > > > >> > > > >> > > > > > > committing > >> > > > >> > > > >> > > > > > > >> > > offset. > >> > > > >> > > > >> > > > > > > >> > > > If user calls commitSync(..) to > >> commit > >> > > > >> offset > >> > > > >> > 10 > >> > > > >> > > > for > >> > > > >> > > > >> a > >> > > > >> > > > >> > > given > >> > > > >> > > > >> > > > > > > >> partition > >> > > > >> > > > >> > > > > > > >> > > > without having consumed the > message > >> > with > >> > > > >> > offset 9 > >> > > > >> > > > >> using > >> > > > >> > > > >> > > this > >> > > > >> > > > >> > > > > > > >> consumer > >> > > > >> > > > >> > > > > > > >> > > > instance, this is probably an > >> advanced > >> > > > >> > use-case. > >> > > > >> > > In > >> > > > >> > > > >> this > >> > > > >> > > > >> > > > case > >> > > > >> > > > >> > > > > > the > >> > > > >> > > > >> > > > > > > >> > > advanced > >> > > > >> > > > >> > > > > > > >> > > > user can retrieve the > >> metadata_epoch > >> > > using > >> > > > >> the > >> > > > >> > > > newly > >> > > > >> > > > >> > added > >> > > > >> > > > >> > > > > > > >> > > metadataEpoch() > >> > > > >> > > > >> > > > > > > >> > > > API after it fetches the message > >> with > >> > > > >> offset 9 > >> > > > >> > > > >> (probably > >> > > > >> > > > >> > > > from > >> > > > >> > > > >> > > > > > > >> another > >> > > > >> > > > >> > > > > > > >> > > > consumer instance) and encode > this > >> > > > >> > metadata_epoch > >> > > > >> > > > in > >> > > > >> > > > >> the > >> > > > >> > > > >> > > > > > > >> > > > string > OffsetAndMetadata.metadata. > >> Do > >> > > you > >> > > > >> think > >> > > > >> > > > this > >> > > > >> > > > >> > > > solution > >> > > > >> > > > >> > > > > > > would > >> > > > >> > > > >> > > > > > > >> > work? > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully > >> understand > >> > > your > >> > > > >> > latest > >> > > > >> > > > >> > > > suggestion", > >> > > > >> > > > >> > > > > > are > >> > > > >> > > > >> > > > > > > >> you > >> > > > >> > > > >> > > > > > > >> > > > referring to solution related to > >> > unclean > >> > > > >> leader > >> > > > >> > > > >> election > >> > > > >> > > > >> > > > using > >> > > > >> > > > >> > > > > > > >> > > leader_epoch > >> > > > >> > > > >> > > > > > > >> > > > in my previous email? > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > Thanks, > >> > > > >> > > > >> > > > > > > >> > > > Dong > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, > Jun > >> > Rao > >> > > < > >> > > > >> > > > >> > j...@confluent.io > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > > > wrote: > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > > Hi, Dong, > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully > understand > >> > your > >> > > > >> latest > >> > > > >> > > > >> > suggestion. > >> > > > >> > > > >> > > > > > > >> Returning an > >> > > > >> > > > >> > > > > > > >> > > > ever > >> > > > >> > > > >> > > > > > > >> > > > > growing global metadata version > >> > itself > >> > > > is > >> > > > >> no > >> > > > >> > > > ideal, > >> > > > >> > > > >> > but > >> > > > >> > > > >> > > is > >> > > > >> > > > >> > > > > > fine. > >> > > > >> > > > >> > > > > > > >> My > >> > > > >> > > > >> > > > > > > >> > > > > question is whether the > metadata > >> > > version > >> > > > >> > > returned > >> > > > >> > > > >> in > >> > > > >> > > > >> > the > >> > > > >> > > > >> > > > > fetch > >> > > > >> > > > >> > > > > > > >> > response > >> > > > >> > > > >> > > > > > > >> > > > > needs to be stored with the > >> offset > >> > > > >> together > >> > > > >> > if > >> > > > >> > > > >> offsets > >> > > > >> > > > >> > > are > >> > > > >> > > > >> > > > > > > stored > >> > > > >> > > > >> > > > > > > >> > > > > externally. If so, we also have > >> to > >> > > > change > >> > > > >> the > >> > > > >> > > > >> consumer > >> > > > >> > > > >> > > API > >> > > > >> > > > >> > > > > for > >> > > > >> > > > >> > > > > > > >> > > > commitSync() > >> > > > >> > > > >> > > > > > > >> > > > > and need to worry about > >> > compatibility. > >> > > > If > >> > > > >> we > >> > > > >> > > > don't > >> > > > >> > > > >> > store > >> > > > >> > > > >> > > > the > >> > > > >> > > > >> > > > > > > >> metadata > >> > > > >> > > > >> > > > > > > >> > > > > version together with the > offset, > >> > on a > >> > > > >> > consumer > >> > > > >> > > > >> > restart, > >> > > > >> > > > >> > > > > it's > >> > > > >> > > > >> > > > > > > not > >> > > > >> > > > >> > > > > > > >> > clear > >> > > > >> > > > >> > > > > > > >> > > > how > >> > > > >> > > > >> > > > > > > >> > > > > we can ensure the metadata in > the > >> > > > >> consumer is > >> > > > >> > > > high > >> > > > >> > > > >> > > enough > >> > > > >> > > > >> > > > > > since > >> > > > >> > > > >> > > > > > > >> there > >> > > > >> > > > >> > > > > > > >> > > is > >> > > > >> > > > >> > > > > > > >> > > > no > >> > > > >> > > > >> > > > > > > >> > > > > metadata version to compare > with. > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > Thanks, > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > Jun > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, > >> Dong > >> > > > Lin < > >> > > > >> > > > >> > > > > lindon...@gmail.com > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > wrote: > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Hey Jun, > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the > >> explanation. > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > I understand the advantage of > >> > > > >> > partition_epoch > >> > > > >> > > > >> over > >> > > > >> > > > >> > > > > > > >> metadata_epoch. > >> > > > >> > > > >> > > > > > > >> > My > >> > > > >> > > > >> > > > > > > >> > > > > > current concern is that the > >> use of > >> > > > >> > > leader_epoch > >> > > > >> > > > >> and > >> > > > >> > > > >> > > the > >> > > > >> > > > >> > > > > > > >> > > partition_epoch > >> > > > >> > > > >> > > > > > > >> > > > > > requires us considerable > >> change to > >> > > > >> > consumer's > >> > > > >> > > > >> public > >> > > > >> > > > >> > > API > >> > > > >> > > > >> > > > > to > >> > > > >> > > > >> > > > > > > take > >> > > > >> > > > >> > > > > > > >> > care > >> > > > >> > > > >> > > > > > > >> > > > of > >> > > > >> > > > >> > > > > > > >> > > > > > the case where user stores > >> offset > >> > > > >> > externally. > >> > > > >> > > > For > >> > > > >> > > > >> > > > example, > >> > > > >> > > > >> > > > > > > >> > > *consumer*. > >> > > > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have > to > >> > take > >> > > a > >> > > > >> map > >> > > > >> > > whose > >> > > > >> > > > >> > value > >> > > > >> > > > >> > > is > >> > > > >> > > > >> > > > > > > >> <offset, > >> > > > >> > > > >> > > > > > > >> > > > > metadata, > >> > > > >> > > > >> > > > > > > >> > > > > > leader epoch, partition > epoch>. > >> > > > >> > > > >> > *consumer*.*seek*(...) > >> > > > >> > > > >> > > > > would > >> > > > >> > > > >> > > > > > > >> also > >> > > > >> > > > >> > > > > > > >> > > need > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch and > >> partition_epoch > >> > as > >> > > > >> > > parameter. > >> > > > >> > > > >> > > > Technically > >> > > > >> > > > >> > > > > > we > >> > > > >> > > > >> > > > > > > >> can > >> > > > >> > > > >> > > > > > > >> > > > > probably > >> > > > >> > > > >> > > > > > > >> > > > > > still make it work in a > >> backward > >> > > > >> compatible > >> > > > >> > > > >> manner > >> > > > >> > > > >> > > after > >> > > > >> > > > >> > > > > > > careful > >> > > > >> > > > >> > > > > > > >> > > design > >> > > > >> > > > >> > > > > > > >> > > > > and > >> > > > >> > > > >> > > > > > > >> > > > > > discussion. But these changes > >> can > >> > > make > >> > > > >> the > >> > > > >> > > > >> > consumer's > >> > > > >> > > > >> > > > > > > interface > >> > > > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for > more > >> > users > >> > > > >> who do > >> > > > >> > > not > >> > > > >> > > > >> > store > >> > > > >> > > > >> > > > > offset > >> > > > >> > > > >> > > > > > > >> > > > externally. > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > After thinking more about it, > >> we > >> > can > >> > > > >> > address > >> > > > >> > > > all > >> > > > >> > > > >> > > > problems > >> > > > >> > > > >> > > > > > > >> discussed > >> > > > >> > > > >> > > > > > > >> > > by > >> > > > >> > > > >> > > > > > > >> > > > > only > >> > > > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch > >> without > >> > > > >> > introducing > >> > > > >> > > > >> > > > leader_epoch > >> > > > >> > > > >> > > > > or > >> > > > >> > > > >> > > > > > > the > >> > > > >> > > > >> > > > > > > >> > > > > > partition_epoch. The current > >> KIP > >> > > > >> describes > >> > > > >> > > the > >> > > > >> > > > >> > changes > >> > > > >> > > > >> > > > to > >> > > > >> > > > >> > > > > > the > >> > > > >> > > > >> > > > > > > >> > > consumer > >> > > > >> > > > >> > > > > > > >> > > > > API > >> > > > >> > > > >> > > > > > > >> > > > > > and how the new API can be > >> used if > >> > > > user > >> > > > >> > > stores > >> > > > >> > > > >> > offset > >> > > > >> > > > >> > > > > > > >> externally. > >> > > > >> > > > >> > > > > > > >> > In > >> > > > >> > > > >> > > > > > > >> > > > > order > >> > > > >> > > > >> > > > > > > >> > > > > > to address the scenario you > >> > > described > >> > > > >> > > earlier, > >> > > > >> > > > we > >> > > > >> > > > >> > can > >> > > > >> > > > >> > > > > > include > >> > > > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the > >> > FetchResponse > >> > > > and > >> > > > >> the > >> > > > >> > > > >> > > > > > > LeaderAndIsrRequest. > >> > > > >> > > > >> > > > > > > >> > > > Consumer > >> > > > >> > > > >> > > > > > > >> > > > > > remembers the largest > >> > metadata_epoch > >> > > > >> from > >> > > > >> > all > >> > > > >> > > > the > >> > > > >> > > > >> > > > > > > FetchResponse > >> > > > >> > > > >> > > > > > > >> it > >> > > > >> > > > >> > > > > > > >> > > has > >> > > > >> > > > >> > > > > > > >> > > > > > received. The metadata_epoch > >> > > committed > >> > > > >> with > >> > > > >> > > the > >> > > > >> > > > >> > > offset, > >> > > > >> > > > >> > > > > > either > >> > > > >> > > > >> > > > > > > >> > within > >> > > > >> > > > >> > > > > > > >> > > > or > >> > > > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be the > >> > largest > >> > > > >> > > > >> metadata_epoch > >> > > > >> > > > >> > > > across > >> > > > >> > > > >> > > > > > all > >> > > > >> > > > >> > > > > > > >> > > > > > FetchResponse and > >> MetadataResponse > >> > > > ever > >> > > > >> > > > received > >> > > > >> > > > >> by > >> > > > >> > > > >> > > this > >> > > > >> > > > >> > > > > > > >> consumer. > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only > the > >> > > > >> > metadata_epoch > >> > > > >> > > > is > >> > > > >> > > > >> > that > >> > > > >> > > > >> > > we > >> > > > >> > > > >> > > > > can > >> > > > >> > > > >> > > > > > > not > >> > > > >> > > > >> > > > > > > >> > > always > >> > > > >> > > > >> > > > > > > >> > > > > do > >> > > > >> > > > >> > > > > > > >> > > > > > the smart offset reset in > case > >> of > >> > > > >> unclean > >> > > > >> > > > leader > >> > > > >> > > > >> > > > election > >> > > > >> > > > >> > > > > > > which > >> > > > >> > > > >> > > > > > > >> you > >> > > > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in > most > >> > case, > >> > > > >> > unclean > >> > > > >> > > > >> leader > >> > > > >> > > > >> > > > > election > >> > > > >> > > > >> > > > > > > >> > probably > >> > > > >> > > > >> > > > > > > >> > > > > > happens when consumer is not > >> > > > >> > > > >> rebalancing/restarting. > >> > > > >> > > > >> > > In > >> > > > >> > > > >> > > > > > these > >> > > > >> > > > >> > > > > > > >> > cases, > >> > > > >> > > > >> > > > > > > >> > > > > either > >> > > > >> > > > >> > > > > > > >> > > > > > consumer is not directly > >> affected > >> > by > >> > > > >> > unclean > >> > > > >> > > > >> leader > >> > > > >> > > > >> > > > > election > >> > > > >> > > > >> > > > > > > >> since > >> > > > >> > > > >> > > > > > > >> > it > >> > > > >> > > > >> > > > > > > >> > > > is > >> > > > >> > > > >> > > > > > > >> > > > > > not consuming from the end of > >> the > >> > > log, > >> > > > >> or > >> > > > >> > > > >> consumer > >> > > > >> > > > >> > can > >> > > > >> > > > >> > > > > > derive > >> > > > >> > > > >> > > > > > > >> the > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most > >> recent > >> > > > >> message > >> > > > >> > > > >> received > >> > > > >> > > > >> > > > before > >> > > > >> > > > >> > > > > it > >> > > > >> > > > >> > > > > > > >> sees > >> > > > >> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So > >> I am > >> > > not > >> > > > >> sure > >> > > > >> > > it > >> > > > >> > > > is > >> > > > >> > > > >> > > worth > >> > > > >> > > > >> > > > > > adding > >> > > > >> > > > >> > > > > > > >> the > >> > > > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API > to > >> > > > address > >> > > > >> the > >> > > > >> > > > >> > remaining > >> > > > >> > > > >> > > > > corner > >> > > > >> > > > >> > > > > > > >> case. > >> > > > >> > > > >> > > > > > > >> > > What > >> > > > >> > > > >> > > > > > > >> > > > > do > >> > > > >> > > > >> > > > > > > >> > > > > > you think? > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > Thanks, > >> > > > >> > > > >> > > > > > > >> > > > > > Dong > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 > PM, > >> > Jun > >> > > > Rao > >> > > > >> < > >> > > > >> > > > >> > > > j...@confluent.io > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> wrote: > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong, > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply. > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic > recreation > >> > > issue, > >> > > > >> we > >> > > > >> > > could > >> > > > >> > > > >> use > >> > > > >> > > > >> > > > > either a > >> > > > >> > > > >> > > > > > > >> global > >> > > > >> > > > >> > > > > > > >> > > > > > metadata > >> > > > >> > > > >> > > > > > > >> > > > > > > version or a partition > level > >> > > epoch. > >> > > > >> But > >> > > > >> > > > either > >> > > > >> > > > >> one > >> > > > >> > > > >> > > > will > >> > > > >> > > > >> > > > > > be a > >> > > > >> > > > >> > > > > > > >> new > >> > > > >> > > > >> > > > > > > >> > > > > concept, > >> > > > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter > >> seems > >> > > more > >> > > > >> > > natural. > >> > > > >> > > > It > >> > > > >> > > > >> > also > >> > > > >> > > > >> > > > > makes > >> > > > >> > > > >> > > > > > > it > >> > > > >> > > > >> > > > > > > >> > > easier > >> > > > >> > > > >> > > > > > > >> > > > to > >> > > > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's > >> offset is > >> > > > still > >> > > > >> > > valid > >> > > > >> > > > >> > after a > >> > > > >> > > > >> > > > > topic > >> > > > >> > > > >> > > > > > > is > >> > > > >> > > > >> > > > > > > >> > > > > recreated. > >> > > > >> > > > >> > > > > > > >> > > > > > As > >> > > > >> > > > >> > > > > > > >> > > > > > > you pointed out, we don't > >> need > >> > to > >> > > > >> store > >> > > > >> > the > >> > > > >> > > > >> > > partition > >> > > > >> > > > >> > > > > > epoch > >> > > > >> > > > >> > > > > > > in > >> > > > >> > > > >> > > > > > > >> > the > >> > > > >> > > > >> > > > > > > >> > > > > > message. > >> > > > >> > > > >> > > > > > > >> > > > > > > The following is what I am > >> > > thinking. > >> > > > >> > When a > >> > > > >> > > > >> > > partition > >> > > > >> > > > >> > > > is > >> > > > >> > > > >> > > > > > > >> created, > >> > > > >> > > > >> > > > > > > >> > > we > >> > > > >> > > > >> > > > > > > >> > > > > can > >> > > > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch > >> from an > >> > > > >> > > > >> ever-increasing > >> > > > >> > > > >> > > > global > >> > > > >> > > > >> > > > > > > >> counter > >> > > > >> > > > >> > > > > > > >> > and > >> > > > >> > > > >> > > > > > > >> > > > > store > >> > > > >> > > > >> > > > > > > >> > > > > > > it in > >> /brokers/topics/[topic]/ > >> > > > >> > > > >> > > > partitions/[partitionId] > >> > > > >> > > > >> > > > > in > >> > > > >> > > > >> > > > > > > ZK. > >> > > > >> > > > >> > > > > > > >> > The > >> > > > >> > > > >> > > > > > > >> > > > > > > partition > >> > > > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to > every > >> > > broker. > >> > > > >> The > >> > > > >> > > > >> consumer > >> > > > >> > > > >> > > will > >> > > > >> > > > >> > > > > be > >> > > > >> > > > >> > > > > > > >> > tracking > >> > > > >> > > > >> > > > > > > >> > > a > >> > > > >> > > > >> > > > > > > >> > > > > > tuple > >> > > > >> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, > >> > > partition > >> > > > >> > epoch> > >> > > > >> > > > for > >> > > > >> > > > >> > > > offsets. > >> > > > >> > > > >> > > > > > If a > >> > > > >> > > > >> > > > > > > >> > topic > >> > > > >> > > > >> > > > > > > >> > > is > >> > > > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible > >> that a > >> > > > >> > consumer's > >> > > > >> > > > >> offset > >> > > > >> > > > >> > > and > >> > > > >> > > > >> > > > > > leader > >> > > > >> > > > >> > > > > > > >> > epoch > >> > > > >> > > > >> > > > > > > >> > > > > still > >> > > > >> > > > >> > > > > > > >> > > > > > > match that in the broker, > but > >> > > > >> partition > >> > > > >> > > epoch > >> > > > >> > > > >> > won't > >> > > > >> > > > >> > > > be. > >> > > > >> > > > >> > > > > In > >> > > > >> > > > >> > > > > > > >> this > >> > > > >> > > > >> > > > > > > >> > > case, > >> > > > >> > > > >> > > > > > > >> > > > > we > >> > > > >> > > > >> > > > > > > >> > > > > > > can potentially still treat > >> the > >> > > > >> > consumer's > >> > > > >> > > > >> offset > >> > > > >> > > > >> > as > >> > > > >> > > > >> > > > out > >> > > > >> > > > >> > > > > > of > >> > > > >> > > > >> > > > > > > >> range > >> > > > >> > > > >> > > > > > > >> > > and > >> > > > >> > > > >> > > > > > > >> > > > > > reset > >> > > > >> > > > >> > > > > > > >> > > > > > > the offset based on the > >> offset > >> > > reset > >> > > > >> > policy > >> > > > >> > > > in > >> > > > >> > > > >> the > >> > > > >> > > > >> > > > > > consumer. > >> > > > >> > > > >> > > > > > > >> This > >> > > > >> > > > >> > > > > > > >> > > > seems > >> > > > >> > > > >> > > > > > > >> > > > > > > harder to do with a global > >> > > metadata > >> > > > >> > > version. > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > Jun > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at > 6:56 > >> AM, > >> > > > Dong > >> > > > >> > Lin < > >> > > > >> > > > >> > > > > > > >> lindon...@gmail.com> > >> > > > >> > > > >> > > > > > > >> > > > wrote: > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun, > >> > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good > >> example. > >> > > After > >> > > > >> > > thinking > >> > > > >> > > > >> > > through > >> > > > >> > > > >> > > > > this > >> > > > >> > > > >> > > > > > > in > >> > > > >> > > > >> > > > > > > >> > > > detail, I > >> > > > >> > > > >> > > > > > > >> > > > > > > agree > >> > > > >> > > > >> > > > > > > >> > > > > > > > that we need to commit > >> offset > >> > > with > >> > > > >> > leader > >> > > > >> > > > >> epoch > >> > > > >> > > > >> > in > >> > > > >> > > > >> > > > > order > >> > > > >> > > > >> > > > > > > to > >> > > > >> > > > >> > > > > > > >> > > address > >> > > > >> > > > >> > > > > > > >> > > > > > this > >> > > > >> > > > >> > > > > > > >> > > > > > > > example. > >> > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining > >> question > >> > > is > >> > > > >> how > >> > > > >> > to > >> > > > >> > > > >> address > >> > > > >> > > > >> > > the > >> > > > >> > > > >> > > > > > > >> scenario > >> > > > >> > > > >> > > > > > > >> > > that > >> > > > >> > > > >> > > > > > > >> > > > > the > >> > > > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and > >> > re-created. > >> > > > One > >> > > > >> > > > possible > >> > > > >> > > > >> > > > solution > >> > > > >> > > > >> > > > > > is > >> > > > >> > > > >> > > > > > > to > >> > > > >> > > > >> > > > > > > >> > > commit > >> > > > >> > > > >> > > > > > > >> > > > > > > offset > >> > > > >> > > > >> > > > > > > >> > > > > > > > with both the leader > epoch > >> and > >> > > the > >> > > > >> > > metadata > >> > > > >> > > > >> > > version. > >> > > > >> > > > >> > > > > The > >> > > > >> > > > >> > > > > > > >> logic > >> > > > >> > > > >> > > > > > > >> > > and > >> > > > >> > > > >> > > > > > > >> > > > > the > >> > > > >> > > > >> > > > > > > >> > > > > > > > implementation of this > >> > solution > >> > > > does > >> > > > >> > not > >> > > > >> > > > >> > require a > >> > > > >> > > > >> > > > new > >> > > > >> > > > >> > > > > > > >> concept > >> > > > >> > > > >> > > > > > > >> > > > (e.g. > >> > > > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and it > >> does > >> > not > >> > > > >> > require > >> > > > >> > > > any > >> > > > >> > > > >> > > change > >> > > > >> > > > >> > > > to > >> > > > >> > > > >> > > > > > the > >> > > > >> > > > >> > > > > > > >> > > message > >> > > > >> > > > >> > > > > > > >> > > > > > format > >> > > > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also > >> > allows > >> > > us > >> > > > >> to > >> > > > >> > > order > >> > > > >> > > > >> the > >> > > > >> > > > >> > > > > metadata > >> > > > >> > > > >> > > > > > > in > >> > > > >> > > > >> > > > > > > >> a > >> > > > >> > > > >> > > > > > > >> > > > > > > > straightforward manner > >> which > >> > may > >> > > > be > >> > > > >> > > useful > >> > > > >> > > > in > >> > > > >> > > > >> > the > >> > > > >> > > > >> > > > > > future. > >> > > > >> > > > >> > > > > > > >> So it > >> > > > >> > > > >> > > > > > > >> > > may > >> > > > >> > > > >> > > > > > > >> > > > > be > >> > > > >> > > > >> > > > > > > >> > > > > > a > >> > > > >> > > > >> > > > > > > >> > > > > > > > better solution than > >> > generating > >> > > a > >> > > > >> > random > >> > > > >> > > > >> > partition > >> > > > >> > > > >> > > > > epoch > >> > > > >> > > > >> > > > > > > >> every > >> > > > >> > > > >> > > > > > > >> > > time > >> > > > >> > > > >> > > > > > > >> > > > > we > >> > > > >> > > > >> > > > > > > >> > > > > > > > create a partition. Does > >> this > >> > > > sound > >> > > > >> > > > >> reasonable? > >> > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern > with > >> > > using > >> > > > >> the > >> > > > >> > > > >> metadata > >> > > > >> > > > >> > > > version > >> > > > >> > > > >> > > > > > is > >> > > > >> > > > >> > > > > > > >> that > >> > > > >> > > > >> > > > > > > >> > > > > consumer > >> > > > >> > > > >> > > > > > > >> > > > > > > > will be forced to refresh > >> > > metadata > >> > > > >> even > >> > > > >> > > if > >> > > > >> > > > >> > > metadata > >> > > > >> > > > >> > > > > > > version > >> > > > >> > > > >> > > > > > > >> is > >> > > > >> > > > >> > > > > > > >> > > > > > increased > >> > > > >> > > > >> > > > > > > >> > > > > > > > due to topics that the > >> > consumer > >> > > is > >> > > > >> not > >> > > > >> > > > >> > interested > >> > > > >> > > > >> > > > in. > >> > > > >> > > > >> > > > > > Now > >> > > > >> > > > >> > > > > > > I > >> > > > >> > > > >> > > > > > > >> > > > realized > >> > > > >> > > > >> > > > > > > >> > > > > > that > >> > > > >> > > > >> > > > > > > >> > > > > > > > this is probably not a > >> > problem. > >> > > > >> > Currently > >> > > > >> > > > >> client > >> > > > >> > > > >> > > > will > >> > > > >> > > > >> > > > > > > >> refresh > >> > > > >> > > > >> > > > > > > >> > > > > metadata > >> > > > >> > > > >> > > > > > > >> > > > > > > > either due to > >> > > > >> InvalidMetadataException > >> > > > >> > in > >> > > > >> > > > the > >> > > > >> > > > >> > > > response > >> > > > >> > > > >> > > > > > > from > >> > > > >> > > > >> > > > > > > >> > > broker > >> > > > >> > > > >> > > > > > > >> > > > or > >> > > > >> > > > >> > > > > > > >> > > > > > due > >> > > > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The > >> > addition > >> > > > of > >> > > > >> the > >> > > > >> > > > >> metadata > >> > > > >> > > > >> > > > > version > >> > > > >> > > > >> > > > > > > >> should > >> > > > >> > > > >> > > > > > > >> > > > > > increase > >> > > > >> > > > >> > > > > > > >> > > > > > > > the overhead of metadata > >> > refresh > >> > > > >> caused > >> > > > >> > > by > >> > > > >> > > > >> > > > > > > >> > > > InvalidMetadataException. > >> > > > >> > > > >> > > > > > > >> > > > > If > >> > > > >> > > > >> > > > > > > >> > > > > > > > client refresh metadata > >> due to > >> > > > >> expiry > >> > > > >> > and > >> > > > >> > > > it > >> > > > >> > > > >> > > > receives > >> > > > >> > > > >> > > > > a > >> > > > >> > > > >> > > > > > > >> > metadata > >> > > > >> > > > >> > > > > > > >> > > > > whose > >> > > > >> > > > >> > > > > > > >> > > > > > > > version is lower than the > >> > > current > >> > > > >> > > metadata > >> > > > >> > > > >> > > version, > >> > > > >> > > > >> > > > we > >> > > > >> > > > >> > > > > > can > >> > > > >> > > > >> > > > > > > >> > reject > >> > > > >> > > > >> > > > > > > >> > > > the > >> > > > >> > > > >> > > > > > > >> > > > > > > > metadata but still reset > >> the > >> > > > >> metadata > >> > > > >> > > age, > >> > > > >> > > > >> which > >> > > > >> > > > >> > > > > > > essentially > >> > > > >> > > > >> > > > > > > >> > keep > >> > > > >> > > > >> > > > > > > >> > > > the > >> > > > >> > > > >> > > > > > > >> > > > > > > > existing behavior in the > >> > client. > >> > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much, > >> > > > >> > > > >> > > > > > > >> > > > > > > > Dong > >> > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > >> > > > > > > >> > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > > >> > > > >> > > > >> > > > > > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > > > >> > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >