Hey Jun, I think we can probably have a static method in Util class to decode the byte[]. Both KafkaConsumer implementation and the user application will be able to decode the byte array and log its content for debug purpose. So it seems that we can still print the information we want. It is just not explicitly exposed in the consumer interface. Would this address the problem here?
Yeah we can include OffsetEpoch in AdminClient. This can be added in KIP-222? Is there something you would like me to add in this KIP? Thanks! Dong On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > The issue with using just byte[] for OffsetEpoch is that it won't be > printable, which makes debugging harder. > > Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If that > gets adopted before this KIP, we probably want to include OffsetEpoch in > the AdminClient too. > > Thanks, > > Jun > > > On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > I agree. I have updated the KIP to remove the class OffetEpoch and > replace > > OffsetEpoch with byte[] in APIs that use it. Can you see if it looks > good? > > > > Thanks! > > Dong > > > > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > Thanks for the updated KIP. It looks good to me now. The only thing is > > > for OffsetEpoch. > > > If we expose the individual fields in the class, we probably don't need > > the > > > encode/decode methods. If we want to hide the details of OffsetEpoch, > we > > > probably don't want expose the individual fields. > > > > > > Jun > > > > > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > Thinking about point 61 more, I realize that the async zookeeper read > > may > > > > make it less of an issue for controller to read more zookeeper nodes. > > > > Writing partition_epoch in the per-partition znode makes it simpler > to > > > > handle the broker failure between zookeeper writes for a topic > > creation. > > > I > > > > have updated the KIP to use the suggested approach. > > > > > > > > > > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > Thanks much for the comments. Please see my comments inline. > > > > > > > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > >> Hi, Dong, > > > > >> > > > > >> Thanks for the updated KIP. Looks good to me overall. Just a few > > minor > > > > >> comments. > > > > >> > > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition > > > partition): > > > > >> It > > > > >> seems that there is no need to return metadata. We probably want > to > > > > return > > > > >> sth like OffsetAndEpoch. > > > > >> > > > > > > > > > > Previously I think we may want to re-use the existing class to keep > > our > > > > > consumer interface simpler. I have updated the KIP to add class > > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may > > > > confuse > > > > > this name with OffsetEpoch. Does this sound OK? > > > > > > > > > > > > > > >> > > > > >> 61. Should we store partition_epoch in > > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK? > > > > >> > > > > > > > > > > I have considered this. I think the advantage of adding the > > > > > partition->partition_epoch map in the existing > > > > > znode /brokers/topics/[topic]/partitions is that controller only > > needs > > > > to > > > > > read one znode per topic to gets its partition_epoch information. > > > > Otherwise > > > > > controller may need to read one extra znode per partition to get > the > > > same > > > > > information. > > > > > > > > > > When we delete partition or expand partition of a topic, someone > > needs > > > to > > > > > modify partition->partition_epoch map in znode > > > > > /brokers/topics/[topic]/partitions. This may seem a bit more > > > complicated > > > > > than simply adding or deleting znode /brokers/topics/[topic]/ > > > > partitions/[partitionId]. > > > > > But the complexity is probably similar to the existing operation of > > > > > modifying the partition->replica_list mapping in znode > > > > > /brokers/topics/[topic]. So not sure it is better to store the > > > > > partition_epoch in /brokers/topics/[topic]/ > partitions/[partitionId]. > > > > What > > > > > do you think? > > > > > > > > > > > > > > >> > > > > >> 62. For checking outdated metadata in the client, we probably want > > to > > > > add > > > > >> when max_partition_epoch will be used. > > > > >> > > > > > > > > > > The max_partition_epoch is used in the Proposed Changes -> Client's > > > > > metadata refresh section to determine whether a metadata is > outdated. > > > And > > > > > this formula is referenced and re-used in other sections to > determine > > > > > whether a metadata is outdated. Does this formula look OK? > > > > > > > > > > > > > > >> > > > > >> 63. "The leader_epoch should be the largest leader_epoch of > messages > > > > whose > > > > >> offset < the commit offset. If no message has been consumed since > > > > consumer > > > > >> initialization, the leader_epoch from seek(...) or > > OffsetFetchResponse > > > > >> should be used. The partition_epoch should be read from the last > > > > >> FetchResponse corresponding to the given partition and commit > > offset. > > > ": > > > > >> leader_epoch and partition_epoch are associated with an offset. > So, > > if > > > > no > > > > >> message is consumed, there is no offset and therefore there is no > > need > > > > to > > > > >> read leader_epoch and partition_epoch. Also, the leader_epoch > > > associated > > > > >> with the offset should just come from the messages returned in the > > > fetch > > > > >> response. > > > > >> > > > > > > > > > > I am thinking that, if user calls seek(..) and commitSync(...) > > without > > > > > consuming any messages, we should re-use the leader_epoch and > > > > > partition_epoch provided by the seek(...) in the > OffsetCommitRequest. > > > And > > > > > if messages have been successfully consumed, then leader_epoch will > > > come > > > > > from the messages returned in the fetch response. The condition > > > "messages > > > > > whose offset < the commit offset" is needed to take care of the log > > > > > compacted topic which may have offset gap due to log cleaning. > > > > > > > > > > Did I miss something here? Or should I rephrase the paragraph to > make > > > it > > > > > less confusing? > > > > > > > > > > > > > > >> 64. Could you include the public methods in the OffsetEpoch class? > > > > >> > > > > > > > > > > I mistakenly deleted the definition of OffsetEpoch class from the > > KIP. > > > I > > > > > just added it back with the public methods. Could you take another > > > look? > > > > > > > > > > > > > > >> > > > > >> Jun > > > > >> > > > > >> > > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > >> > > > > >> > Hey Jun, > > > > >> > > > > > >> > Thanks much. I agree that we can not rely on committed offsets > to > > be > > > > >> always > > > > >> > deleted when we delete topic. So it is necessary to use a > > > > per-partition > > > > >> > epoch that does not change unless this partition is deleted. I > > also > > > > >> agree > > > > >> > that it is very nice to be able to uniquely identify a message > > with > > > > >> > (offset, leader_epoch, partition_epoch) in face of potential > topic > > > > >> deletion > > > > >> > and unclean leader election. > > > > >> > > > > > >> > I agree with all your comments. And I have updated the KIP based > > on > > > > our > > > > >> > latest discussion. In addition, I added > > > InvalidPartitionEpochException > > > > >> > which will be thrown by consumer.poll() if the partition_epoch > > > > >> associated > > > > >> > with the partition, which can be given to consumer using > > seek(...), > > > is > > > > >> > different from the partition_epoch in the FetchResponse. > > > > >> > > > > > >> > Can you take another look at the latest KIP? > > > > >> > > > > > >> > Thanks! > > > > >> > Dong > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> > > wrote: > > > > >> > > > > > >> > > Hi, Dong, > > > > >> > > > > > > >> > > My replies are the following. > > > > >> > > > > > > >> > > 60. What you described could also work. The drawback is that > we > > > will > > > > >> be > > > > >> > > unnecessarily changing the partition epoch when a partition > > hasn't > > > > >> really > > > > >> > > changed. I was imagining that the partition epoch will be > stored > > > in > > > > >> > > /brokers/topics/[topic]/partitions/[partitionId], instead of > at > > > the > > > > >> > topic > > > > >> > > level. So, not sure if ZK size limit is an issue. > > > > >> > > > > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique > > > > identifier > > > > >> > for > > > > >> > > a message. So, if a message hasn't changed, the offset and the > > > > >> associated > > > > >> > > offset_epoch ideally should remain the same (it will be kind > of > > > > weird > > > > >> if > > > > >> > > two consumer apps save the offset on the same message, but the > > > > >> > offset_epoch > > > > >> > > are different). partition_epoch + leader_epoch give us that. > > > > >> > global_epoch + > > > > >> > > leader_epoch don't. If we use this approach, we can solve not > > only > > > > the > > > > >> > > problem that you have identified, but also other problems when > > > there > > > > >> is > > > > >> > > data loss or topic re-creation more reliably. For example, in > > the > > > > >> future, > > > > >> > > if we include the partition_epoch and leader_epoch in the > fetch > > > > >> request, > > > > >> > > the server can do a more reliable check of whether that offset > > is > > > > >> valid > > > > >> > or > > > > >> > > not. I am not sure that we can rely upon all external offsets > to > > > be > > > > >> > removed > > > > >> > > on topic deletion. For example, a topic may be deleted by an > > admin > > > > who > > > > >> > may > > > > >> > > not know all the applications. > > > > >> > > > > > > >> > > If we agree on the above, the second question is then how to > > > > reliably > > > > >> > > propagate the partition_epoch and the leader_epoch to the > > consumer > > > > >> when > > > > >> > > there are leader or partition changes. The leader_epoch comes > > from > > > > the > > > > >> > > message, which is reliable. So, I was suggesting that when we > > > store > > > > an > > > > >> > > offset, we can just store the leader_epoch from the message > set > > > > >> > containing > > > > >> > > that offset. Similarly, I was thinking that if the > > partition_epoch > > > > is > > > > >> in > > > > >> > > the fetch response, we can propagate partition_epoch reliably > > > where > > > > is > > > > >> > > partition_epoch change. > > > > >> > > > > > > >> > > 63. My point is that once a leader is producing a message in > the > > > new > > > > >> > > partition_epoch, ideally, we should associate the new offsets > > with > > > > the > > > > >> > new > > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be the > > correct > > > > >> unique > > > > >> > > identifier (useful for solving other problems mentioned > above). > > I > > > > was > > > > >> > > originally thinking that the leader will include the > > > partition_epoch > > > > >> in > > > > >> > the > > > > >> > > metadata cache in the fetch response. It's just that right > now, > > > > >> metadata > > > > >> > > cache is updated on UpdateMetadataRequest, which typically > > happens > > > > >> after > > > > >> > > the LeaderAndIsrRequest. Another approach is for the leader to > > > cache > > > > >> the > > > > >> > > partition_epoch in the Partition object and return that > (instead > > > of > > > > >> the > > > > >> > one > > > > >> > > in metadata cache) in the fetch response. > > > > >> > > > > > > >> > > 65. It seems to me that the global_epoch and the > partition_epoch > > > > have > > > > >> > > different purposes. A partition_epoch has the benefit that it > > (1) > > > > can > > > > >> be > > > > >> > > used to form a unique identifier for a message and (2) can be > > used > > > > to > > > > >> > > solve other > > > > >> > > corner case problems in the future. I am not sure having just > a > > > > >> > > global_epoch can achieve these. global_epoch is useful to > > > determine > > > > >> which > > > > >> > > version of the metadata is newer, especially with topic > > deletion. > > > > >> > > > > > > >> > > Thanks, > > > > >> > > > > > > >> > > Jun > > > > >> > > > > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin < > lindon...@gmail.com> > > > > >> wrote: > > > > >> > > > > > > >> > > > Regarding the use of the global epoch in 65), it is very > > similar > > > > to > > > > >> the > > > > >> > > > proposal of the metadata_epoch we discussed earlier. The > main > > > > >> > difference > > > > >> > > is > > > > >> > > > that this epoch is incremented when we create/expand/delete > > > topic > > > > >> and > > > > >> > > does > > > > >> > > > not change when controller re-send metadata. > > > > >> > > > > > > > >> > > > I looked at our previous discussion. It seems that we prefer > > > > >> > > > partition_epoch over the metadata_epoch because 1) we prefer > > not > > > > to > > > > >> > have > > > > >> > > an > > > > >> > > > ever growing metadata_epoch and 2) we can reset offset > better > > > when > > > > >> > topic > > > > >> > > is > > > > >> > > > re-created. The use of global topic_epoch avoids the > drawback > > of > > > > an > > > > >> > ever > > > > >> > > > quickly ever growing metadata_epoch. Though the global epoch > > > does > > > > >> not > > > > >> > > allow > > > > >> > > > us to recognize the invalid offset committed before the > topic > > > > >> > > re-creation, > > > > >> > > > we can probably just delete the offset when we delete a > topic. > > > > Thus > > > > >> I > > > > >> > am > > > > >> > > > not very sure whether it is still worthwhile to have a > > > > per-partition > > > > >> > > > partition_epoch if the metadata already has the global > epoch. > > > > >> > > > > > > > >> > > > > > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin < > lindon...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > > > > >> > > > > Hey Jun, > > > > >> > > > > > > > > >> > > > > Thanks so much. These comments very useful. Please see > below > > > my > > > > >> > > comments. > > > > >> > > > > > > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io > > > > > > wrote: > > > > >> > > > > > > > > >> > > > >> Hi, Dong, > > > > >> > > > >> > > > > >> > > > >> Thanks for the updated KIP. A few more comments. > > > > >> > > > >> > > > > >> > > > >> 60. Perhaps having a partition epoch is more flexible > since > > > in > > > > >> the > > > > >> > > > future, > > > > >> > > > >> we may support deleting a partition as well. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > Yeah I have considered this. I think we can probably still > > > > support > > > > >> > > > > deleting a partition by using the topic_epoch -- when > > > partition > > > > >> of a > > > > >> > > > topic > > > > >> > > > > is deleted or created, epoch of all partitions of this > topic > > > > will > > > > >> be > > > > >> > > > > incremented by 1. Therefore, if that partition is > re-created > > > > >> later, > > > > >> > the > > > > >> > > > > epoch of that partition will still be larger than its > epoch > > > > before > > > > >> > the > > > > >> > > > > deletion, which still allows the client to order the > > metadata > > > > for > > > > >> the > > > > >> > > > > purpose of this KIP. Does this sound reasonable? > > > > >> > > > > > > > > >> > > > > The advantage of using topic_epoch instead of > > partition_epoch > > > is > > > > >> that > > > > >> > > the > > > > >> > > > > size of the /brokers/topics/[topic] znode and > > request/response > > > > >> size > > > > >> > can > > > > >> > > > be > > > > >> > > > > smaller. We have a limit on the maximum size of znode > > > (typically > > > > >> > 1MB). > > > > >> > > > Use > > > > >> > > > > partition epoch can effectively reduce the number of > > > partitions > > > > >> that > > > > >> > > can > > > > >> > > > be > > > > >> > > > > described by the /brokers/topics/[topic] znode. > > > > >> > > > > > > > > >> > > > > One use-case of partition_epoch for client to detect that > > the > > > > >> > committed > > > > >> > > > > offset, either from kafka offset topic or from the > external > > > > store > > > > >> is > > > > >> > > > > invalid after partition deletion and re-creation. However, > > it > > > > >> seems > > > > >> > > that > > > > >> > > > we > > > > >> > > > > can also address this use-case with other approaches. For > > > > example, > > > > >> > when > > > > >> > > > > AdminClient deletes partitions, it can also delete the > > > committed > > > > >> > > offsets > > > > >> > > > > for those partitions from the offset topic. If user stores > > > > offset > > > > >> > > > > externally, it might make sense for user to similarly > remove > > > > >> offsets > > > > >> > of > > > > >> > > > > related partitions after these partitions are deleted. So > I > > am > > > > not > > > > >> > sure > > > > >> > > > > that we should use partition_epoch in this KIP. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> > > > > >> > > > >> 61. It seems that the leader epoch returned in the > > position() > > > > >> call > > > > >> > > > should > > > > >> > > > >> the the leader epoch returned in the fetch response, not > > the > > > > one > > > > >> in > > > > >> > > the > > > > >> > > > >> metadata cache of the client. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > I think this is a good idea. Just to double check, this > > change > > > > >> does > > > > >> > not > > > > >> > > > > affect the correctness or performance of this KIP. But it > > can > > > be > > > > >> > useful > > > > >> > > > if > > > > >> > > > > we want to use the leader_epoch to better handle the > offset > > > rest > > > > >> in > > > > >> > > case > > > > >> > > > of > > > > >> > > > > unclean leader election, which is listed in the future > work. > > > Is > > > > >> this > > > > >> > > > > understanding correct? > > > > >> > > > > > > > > >> > > > > I have updated the KIP to specify that the leader_epoch > > > returned > > > > >> by > > > > >> > > > > position() should be the largest leader_epoch of those > > already > > > > >> > consumed > > > > >> > > > > messages whose offset < position. If no message has been > > > > consumed > > > > >> > since > > > > >> > > > > consumer initialization, the leader_epoch from seek() or > > > > >> > > > > OffsetFetchResponse should be used. The offset included in > > the > > > > >> > > > > OffsetCommitRequest will also be determined in the similar > > > > manner. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> > > > > >> > > > >> 62. I am wondering if we should return the partition > epoch > > in > > > > the > > > > >> > > fetch > > > > >> > > > >> response as well. In the current proposal, if a topic is > > > > >> recreated > > > > >> > and > > > > >> > > > the > > > > >> > > > >> new leader is on the same broker as the old one, there is > > > > >> nothing to > > > > >> > > > force > > > > >> > > > >> the metadata refresh in the client. So, the client may > > still > > > > >> > associate > > > > >> > > > the > > > > >> > > > >> offset with the old partition epoch. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > Could you help me understand the problem if a client > > > associates > > > > >> old > > > > >> > > > > partition_epoch (or the topic_epoch as of the current KIP) > > > with > > > > >> the > > > > >> > > > offset? > > > > >> > > > > The main purpose of the topic_epoch is to be able to drop > > > > >> > leader_epoch > > > > >> > > > to 0 > > > > >> > > > > after a partition is deleted and re-created. I guess you > may > > > be > > > > >> > > thinking > > > > >> > > > > about using the partition_epoch to detect that the > committed > > > > >> offset > > > > >> > is > > > > >> > > > > invalid? In that case, I am wondering if the alternative > > > > approach > > > > >> > > > described > > > > >> > > > > in 60) would be reasonable. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> > > > > >> > > > >> 63. There is some subtle coordination between the > > > > >> > LeaderAndIsrRequest > > > > >> > > > and > > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader changes, > > the > > > > >> > > controller > > > > >> > > > >> first sends the LeaderAndIsrRequest to the assigned > > replicas > > > > and > > > > >> the > > > > >> > > > >> UpdateMetadataRequest to every broker. So, there could > be a > > > > small > > > > >> > > window > > > > >> > > > >> when the leader already receives the new partition epoch > in > > > the > > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the broker > > > > hasn't > > > > >> > been > > > > >> > > > >> updated with the latest partition epoch. Not sure what's > > the > > > > best > > > > >> > way > > > > >> > > to > > > > >> > > > >> address this issue. Perhaps we can update the metadata > > cache > > > on > > > > >> the > > > > >> > > > broker > > > > >> > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. > > The > > > > >> > challenge > > > > >> > > > is > > > > >> > > > >> that the two have slightly different data. For example, > > only > > > > the > > > > >> > > latter > > > > >> > > > >> has > > > > >> > > > >> all endpoints. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > I am not sure whether this is a problem. Could you > explain a > > > bit > > > > >> more > > > > >> > > > what > > > > >> > > > > specific problem this small window can cause? > > > > >> > > > > > > > > >> > > > > Since client can fetch metadata from any broker in the > > > cluster, > > > > >> and > > > > >> > > given > > > > >> > > > > that different brokers receive request (e.g. > > > LeaderAndIsrRequest > > > > >> and > > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the metadata > > > received > > > > >> by > > > > >> > > > client > > > > >> > > > > can be in arbitrary order (either newer or older) compared > > to > > > > the > > > > >> > > > broker's > > > > >> > > > > leadership state even if a given broker receives > > > > >> LeaderAndIsrRequest > > > > >> > > and > > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not sure it > is > > > > >> useful > > > > >> > to > > > > >> > > > > update broker's cache with LeaderAndIsrRequest. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> 64. The enforcement of leader epoch in Offset commit: We > > > allow > > > > a > > > > >> > > > consumer > > > > >> > > > >> to set an arbitrary offset. So it's possible for offsets > or > > > > >> leader > > > > >> > > epoch > > > > >> > > > >> to > > > > >> > > > >> go backwards. I am not sure if we could always enforce > that > > > the > > > > >> > leader > > > > >> > > > >> epoch only goes up on the broker. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > Sure. I have removed this check from the KIP. > > > > >> > > > > > > > > >> > > > > BTW, we can probably still ensure that the leader_epoch > > always > > > > >> > increase > > > > >> > > > if > > > > >> > > > > the leader_epoch used with offset commit is the > > > max(leader_epoch > > > > >> of > > > > >> > the > > > > >> > > > > message with offset = the committed offset - 1, the > largest > > > > known > > > > >> > > > > leader_epoch from the metadata). But I don't have a good > > > > use-case > > > > >> for > > > > >> > > > this > > > > >> > > > > alternative definition. So I choose the keep the KIP > simple > > by > > > > >> > > requiring > > > > >> > > > > leader_epoch to always increase. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> 65. Good point on handling missing partition epoch due to > > > topic > > > > >> > > > deletion. > > > > >> > > > >> Another potential way to address this is to additionally > > > > >> propagate > > > > >> > the > > > > >> > > > >> global partition epoch to brokers and the clients. This > > way, > > > > >> when a > > > > >> > > > >> partition epoch is missing, we can use the global > partition > > > > >> epoch to > > > > >> > > > >> reason > > > > >> > > > >> about which metadata is more recent. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > This is a great idea. The global epoch can be used to > order > > > the > > > > >> > > metadata > > > > >> > > > > and help us recognize the more recent metadata if a topic > > (or > > > > >> > > partition) > > > > >> > > > is > > > > >> > > > > deleted and re-created. > > > > >> > > > > > > > > >> > > > > Actually, it seems we only need to propagate the global > > epoch > > > to > > > > >> > > brokers > > > > >> > > > > and clients without propagating this epoch on a per-topic > or > > > > >> > > > per-partition > > > > >> > > > > basic. Doing so would simply interface changes made this > > KIP. > > > > Does > > > > >> > this > > > > >> > > > > approach sound reasonable? > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> 66. A client may also get an offset by time using the > > > > >> > offsetForTimes() > > > > >> > > > >> api. > > > > >> > > > >> So, we probably want to include offsetInternalMetadata in > > > > >> > > > >> OffsetAndTimestamp > > > > >> > > > >> as well. > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > You are right. This probably also requires us to change > the > > > > >> > > > > ListOffsetRequest as well. I will update the KIP after we > > > agree > > > > on > > > > >> > the > > > > >> > > > > solution for 65). > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> > > > > >> > > > >> 67. InteralMetadata can be a bit confusing with the > > metadata > > > > >> field > > > > >> > > > already > > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It might > be > > > > >> useful > > > > >> > to > > > > >> > > > make > > > > >> > > > >> OffsetEpoch printable at least for debugging purpose. > Once > > > you > > > > do > > > > >> > > that, > > > > >> > > > we > > > > >> > > > >> are already exposing the internal fields. So, not sure if > > > it's > > > > >> worth > > > > >> > > > >> hiding > > > > >> > > > >> them. If we do want to hide them, perhaps we can have sth > > > like > > > > >> the > > > > >> > > > >> following. The binary encoding is probably more efficient > > > than > > > > >> JSON > > > > >> > > for > > > > >> > > > >> external storage. > > > > >> > > > >> > > > > >> > > > >> OffsetEpoch { > > > > >> > > > >> static OffsetEpoch decode(byte[]); > > > > >> > > > >> > > > > >> > > > >> public byte[] encode(); > > > > >> > > > >> > > > > >> > > > >> public String toString(); > > > > >> > > > >> } > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > Thanks much. I like this solution. I have updated the KIP > > > > >> > accordingly. > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > >> > > > > >> > > > >> Jun > > > > >> > > > >> > > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin < > > > lindon...@gmail.com> > > > > >> > wrote: > > > > >> > > > >> > > > > >> > > > >> > Hey Jason, > > > > >> > > > >> > > > > > >> > > > >> > Certainly. This sounds good. I have updated the KIP to > > > > clarity > > > > >> > that > > > > >> > > > the > > > > >> > > > >> > global epoch will be incremented by 1 each time a topic > > is > > > > >> > deleted. > > > > >> > > > >> > > > > > >> > > > >> > Thanks, > > > > >> > > > >> > Dong > > > > >> > > > >> > > > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson < > > > > >> > ja...@confluent.io > > > > >> > > > > > > > >> > > > >> > wrote: > > > > >> > > > >> > > > > > >> > > > >> > > Hi Dong, > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > I think your approach will allow user to distinguish > > > > between > > > > >> the > > > > >> > > > >> metadata > > > > >> > > > >> > > > before and after the topic deletion. I also agree > > that > > > > this > > > > >> > can > > > > >> > > be > > > > >> > > > >> > > > potentially be useful to user. I am just not very > > sure > > > > >> whether > > > > >> > > we > > > > >> > > > >> > already > > > > >> > > > >> > > > have a good use-case to make the additional > > complexity > > > > >> > > worthwhile. > > > > >> > > > >> It > > > > >> > > > >> > > seems > > > > >> > > > >> > > > that this feature is kind of independent of the > main > > > > >> problem > > > > >> > of > > > > >> > > > this > > > > >> > > > >> > KIP. > > > > >> > > > >> > > > Could we add this as a future work? > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > Do you think it's fair if we bump the topic epoch on > > > > deletion > > > > >> > and > > > > >> > > > >> leave > > > > >> > > > >> > > propagation of the epoch for deleted topics for > future > > > > work? > > > > >> I > > > > >> > > don't > > > > >> > > > >> > think > > > > >> > > > >> > > this adds much complexity and it makes the behavior > > > > >> consistent: > > > > >> > > > every > > > > >> > > > >> > topic > > > > >> > > > >> > > mutation results in an epoch bump. > > > > >> > > > >> > > > > > > >> > > > >> > > Thanks, > > > > >> > > > >> > > Jason > > > > >> > > > >> > > > > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin < > > > > >> lindon...@gmail.com> > > > > >> > > > wrote: > > > > >> > > > >> > > > > > > >> > > > >> > > > Hey Ismael, > > > > >> > > > >> > > > > > > > >> > > > >> > > > I guess we actually need user to see this field so > > that > > > > >> user > > > > >> > can > > > > >> > > > >> store > > > > >> > > > >> > > this > > > > >> > > > >> > > > value in the external store together with the > offset. > > > We > > > > >> just > > > > >> > > > prefer > > > > >> > > > >> > the > > > > >> > > > >> > > > value to be opaque to discourage most users from > > > > >> interpreting > > > > >> > > this > > > > >> > > > >> > value. > > > > >> > > > >> > > > One more advantage of using such an opaque field is > > to > > > be > > > > >> able > > > > >> > > to > > > > >> > > > >> > evolve > > > > >> > > > >> > > > the information (or schema) of this value without > > > > changing > > > > >> > > > consumer > > > > >> > > > >> API > > > > >> > > > >> > > in > > > > >> > > > >> > > > the future. > > > > >> > > > >> > > > > > > > >> > > > >> > > > I also thinking it is probably OK for user to be > able > > > to > > > > >> > > interpret > > > > >> > > > >> this > > > > >> > > > >> > > > value, particularly for those advanced users. > > > > >> > > > >> > > > > > > > >> > > > >> > > > Thanks, > > > > >> > > > >> > > > Dong > > > > >> > > > >> > > > > > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma < > > > > >> > ism...@juma.me.uk> > > > > >> > > > >> wrote: > > > > >> > > > >> > > > > > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson < > > > > >> > > > >> ja...@confluent.io> > > > > >> > > > >> > > > > wrote: > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > class OffsetAndMetadata { > > > > >> > > > >> > > > > > long offset; > > > > >> > > > >> > > > > > byte[] offsetMetadata; > > > > >> > > > >> > > > > > String metadata; > > > > >> > > > >> > > > > > } > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying, but > we > > > can > > > > >> > > probably > > > > >> > > > >> come > > > > >> > > > >> > up > > > > >> > > > >> > > > > with > > > > >> > > > >> > > > > > something better. Internally the byte array > would > > > > have > > > > >> a > > > > >> > > > >> version. > > > > >> > > > >> > If > > > > >> > > > >> > > in > > > > >> > > > >> > > > > the > > > > >> > > > >> > > > > > future we have anything else we need to add, we > > can > > > > >> update > > > > >> > > the > > > > >> > > > >> > > version > > > > >> > > > >> > > > > and > > > > >> > > > >> > > > > > we wouldn't need any new APIs. > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > We can also add fields to a class in a compatible > > > way. > > > > >> So, > > > > >> > it > > > > >> > > > >> seems > > > > >> > > > >> > to > > > > >> > > > >> > > me > > > > >> > > > >> > > > > that the main advantage of the byte array is that > > > it's > > > > >> > opaque > > > > >> > > to > > > > >> > > > >> the > > > > >> > > > >> > > > user. > > > > >> > > > >> > > > > Is that correct? If so, we could also add any > > opaque > > > > >> > metadata > > > > >> > > > in a > > > > >> > > > >> > > > subclass > > > > >> > > > >> > > > > so that users don't even see it (unless they cast > > it, > > > > but > > > > >> > then > > > > >> > > > >> > they're > > > > >> > > > >> > > on > > > > >> > > > >> > > > > their own). > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > Ismael > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > The corresponding seek() and position() APIs > might > > > look > > > > >> > > > something > > > > >> > > > >> > like > > > > >> > > > >> > > > > this: > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > void seek(TopicPartition partition, long > offset, > > > > byte[] > > > > >> > > > >> > > > offsetMetadata); > > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition > > partition); > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > What do you think? > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > Thanks, > > > > >> > > > >> > > > > > Jason > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin < > > > > >> > > lindon...@gmail.com > > > > >> > > > > > > > > >> > > > >> > > wrote: > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > Hey Jun, Jason, > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > Thanks much for all the feedback. I have > > updated > > > > the > > > > >> KIP > > > > >> > > > >> based on > > > > >> > > > >> > > the > > > > >> > > > >> > > > > > > latest discussion. Can you help check whether > > it > > > > >> looks > > > > >> > > good? > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > Thanks, > > > > >> > > > >> > > > > > > Dong > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin < > > > > >> > > > lindon...@gmail.com > > > > >> > > > >> > > > > > >> > > > >> > > > wrote: > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > Hey Jun, > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am not > > sure > > > > that > > > > >> > the > > > > >> > > > >> > proposed > > > > >> > > > >> > > > API > > > > >> > > > >> > > > > is > > > > >> > > > >> > > > > > > > sufficient. For users that store offset > > > > >> externally, we > > > > >> > > > >> probably > > > > >> > > > >> > > > need > > > > >> > > > >> > > > > > > extra > > > > >> > > > >> > > > > > > > API to return the leader_epoch and > > > > partition_epoch > > > > >> for > > > > >> > > all > > > > >> > > > >> > > > partitions > > > > >> > > > >> > > > > > > that > > > > >> > > > >> > > > > > > > consumers are consuming. I suppose these > > users > > > > >> > currently > > > > >> > > > use > > > > >> > > > >> > > > > position() > > > > >> > > > >> > > > > > > to > > > > >> > > > >> > > > > > > > get the offset. Thus we probably need a new > > > > method > > > > >> > > > >> > > > > > positionWithEpoch(..) > > > > >> > > > >> > > > > > > to > > > > >> > > > >> > > > > > > > return <offset, partition_epoch, > > leader_epoch>. > > > > >> Does > > > > >> > > this > > > > >> > > > >> sound > > > > >> > > > >> > > > > > > reasonable? > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > Thanks, > > > > >> > > > >> > > > > > > > Dong > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao < > > > > >> > > j...@confluent.io > > > > >> > > > > > > > > >> > > > >> > > wrote: > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> Hi, Dong, > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking. > OffsetEpoch > > > will > > > > >> be > > > > >> > > > >> composed > > > > >> > > > >> > of > > > > >> > > > >> > > > > > > >> (partition_epoch, > > > > >> > > > >> > > > > > > >> leader_epoch). > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> Thanks, > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> Jun > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin < > > > > >> > > > >> lindon...@gmail.com > > > > >> > > > >> > > > > > > >> > > > >> > > > > wrote: > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > Hey Jun, > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new API that > > you > > > > >> > > proposed. > > > > >> > > > I > > > > >> > > > >> am > > > > >> > > > >> > > not > > > > >> > > > >> > > > > sure > > > > >> > > > >> > > > > > > >> what > > > > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I > > suppose > > > > >> that we > > > > >> > > can > > > > >> > > > >> use > > > > >> > > > >> > > the > > > > >> > > > >> > > > > pair > > > > >> > > > >> > > > > > > of > > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the > > > > >> > offset_epoch, > > > > >> > > > >> right? > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > Thanks, > > > > >> > > > >> > > > > > > >> > Dong > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao > < > > > > >> > > > >> j...@confluent.io> > > > > >> > > > >> > > > wrote: > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > Hi, Dong, > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > Got it. The api that you proposed > works. > > > The > > > > >> > > question > > > > >> > > > >> is > > > > >> > > > >> > > > whether > > > > >> > > > >> > > > > > > >> that's > > > > >> > > > >> > > > > > > >> > the > > > > >> > > > >> > > > > > > >> > > api that we want to have in the long > > term. > > > > My > > > > >> > > concern > > > > >> > > > >> is > > > > >> > > > >> > > that > > > > >> > > > >> > > > > > while > > > > >> > > > >> > > > > > > >> the > > > > >> > > > >> > > > > > > >> > api > > > > >> > > > >> > > > > > > >> > > change is simple, the new api seems > > harder > > > > to > > > > >> > > explain > > > > >> > > > >> and > > > > >> > > > >> > > use. > > > > >> > > > >> > > > > For > > > > >> > > > >> > > > > > > >> > example, > > > > >> > > > >> > > > > > > >> > > a consumer storing offsets externally > > now > > > > >> needs > > > > >> > to > > > > >> > > > call > > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling > > > > seek(). > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > An alternative approach is to make the > > > > >> following > > > > >> > > > >> > compatible > > > > >> > > > >> > > > api > > > > >> > > > >> > > > > > > >> changes > > > > >> > > > >> > > > > > > >> > in > > > > >> > > > >> > > > > > > >> > > Consumer. > > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch field > in > > > > >> > > > >> > OffsetAndMetadata. > > > > >> > > > >> > > > (no > > > > >> > > > >> > > > > > need > > > > >> > > > >> > > > > > > >> to > > > > >> > > > >> > > > > > > >> > > change the CommitSync() api) > > > > >> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition > > > > partition, > > > > >> > long > > > > >> > > > >> > offset, > > > > >> > > > >> > > > > > > >> OffsetEpoch > > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially > > deprecate > > > > the > > > > >> > old > > > > >> > > > api > > > > >> > > > >> > > > > > > >> > seek(TopicPartition > > > > >> > > > >> > > > > > > >> > > partition, long offset) in the future. > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > The alternative approach has similar > > > amount > > > > of > > > > >> > api > > > > >> > > > >> changes > > > > >> > > > >> > > as > > > > >> > > > >> > > > > > yours > > > > >> > > > >> > > > > > > >> but > > > > >> > > > >> > > > > > > >> > has > > > > >> > > > >> > > > > > > >> > > the following benefits. > > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar way as > how > > > > >> offset > > > > >> > > > >> management > > > > >> > > > >> > > > works > > > > >> > > > >> > > > > > now > > > > >> > > > >> > > > > > > >> and > > > > >> > > > >> > > > > > > >> > is > > > > >> > > > >> > > > > > > >> > > probably what we want in the long > term. > > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better when > > there > > > is > > > > >> data > > > > >> > > > loss > > > > >> > > > >> due > > > > >> > > > >> > > to > > > > >> > > > >> > > > > > > unclean > > > > >> > > > >> > > > > > > >> > > leader election or correlated replica > > > > failure. > > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better when > > topic > > > is > > > > >> > > > recreated. > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > Thanks, > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > Jun > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong > > Lin < > > > > >> > > > >> > > lindon...@gmail.com > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > wrote: > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > Hey Jun, > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't > > want > > > an > > > > >> ever > > > > >> > > > >> growing > > > > >> > > > >> > > > global > > > > >> > > > >> > > > > > > >> metadata > > > > >> > > > >> > > > > > > >> > > > version. I just think it may be more > > > > >> desirable > > > > >> > to > > > > >> > > > >> keep > > > > >> > > > >> > the > > > > >> > > > >> > > > > > > consumer > > > > >> > > > >> > > > > > > >> API > > > > >> > > > >> > > > > > > >> > > > simple. > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata > > version > > > > >> > returned > > > > >> > > > in > > > > >> > > > >> the > > > > >> > > > >> > > > fetch > > > > >> > > > >> > > > > > > >> response > > > > >> > > > >> > > > > > > >> > > > will be stored with the offset > > together. > > > > >> More > > > > >> > > > >> > > specifically, > > > > >> > > > >> > > > > the > > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new offset > topic > > > > >> schema > > > > >> > > will > > > > >> > > > be > > > > >> > > > >> > the > > > > >> > > > >> > > > > > largest > > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the > > > > MetadataResponse > > > > >> > and > > > > >> > > > >> > > > FetchResponse > > > > >> > > > >> > > > > > > ever > > > > >> > > > >> > > > > > > >> > > > received by this consumer. > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > We probably don't have to change the > > > > >> consumer > > > > >> > API > > > > >> > > > for > > > > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition, > > > > >> > > OffsetAndMetadata>). > > > > >> > > > >> If > > > > >> > > > >> > > user > > > > >> > > > >> > > > > > calls > > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 > > for > > > a > > > > >> given > > > > >> > > > >> > partition, > > > > >> > > > >> > > > for > > > > >> > > > >> > > > > > > most > > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer instance > > should > > > > >> have > > > > >> > > > >> consumed > > > > >> > > > >> > > > message > > > > >> > > > >> > > > > > > with > > > > >> > > > >> > > > > > > >> > > offset > > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in which case > > the > > > > >> > consumer > > > > >> > > > can > > > > >> > > > >> > > > remember > > > > >> > > > >> > > > > > and > > > > >> > > > >> > > > > > > >> use > > > > >> > > > >> > > > > > > >> > > the > > > > >> > > > >> > > > > > > >> > > > metadata_epoch from the > corresponding > > > > >> > > FetchResponse > > > > >> > > > >> when > > > > >> > > > >> > > > > > > committing > > > > >> > > > >> > > > > > > >> > > offset. > > > > >> > > > >> > > > > > > >> > > > If user calls commitSync(..) to > commit > > > > >> offset > > > > >> > 10 > > > > >> > > > for > > > > >> > > > >> a > > > > >> > > > >> > > given > > > > >> > > > >> > > > > > > >> partition > > > > >> > > > >> > > > > > > >> > > > without having consumed the message > > with > > > > >> > offset 9 > > > > >> > > > >> using > > > > >> > > > >> > > this > > > > >> > > > >> > > > > > > >> consumer > > > > >> > > > >> > > > > > > >> > > > instance, this is probably an > advanced > > > > >> > use-case. > > > > >> > > In > > > > >> > > > >> this > > > > >> > > > >> > > > case > > > > >> > > > >> > > > > > the > > > > >> > > > >> > > > > > > >> > > advanced > > > > >> > > > >> > > > > > > >> > > > user can retrieve the metadata_epoch > > > using > > > > >> the > > > > >> > > > newly > > > > >> > > > >> > added > > > > >> > > > >> > > > > > > >> > > metadataEpoch() > > > > >> > > > >> > > > > > > >> > > > API after it fetches the message > with > > > > >> offset 9 > > > > >> > > > >> (probably > > > > >> > > > >> > > > from > > > > >> > > > >> > > > > > > >> another > > > > >> > > > >> > > > > > > >> > > > consumer instance) and encode this > > > > >> > metadata_epoch > > > > >> > > > in > > > > >> > > > >> the > > > > >> > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. > Do > > > you > > > > >> think > > > > >> > > > this > > > > >> > > > >> > > > solution > > > > >> > > > >> > > > > > > would > > > > >> > > > >> > > > > > > >> > work? > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully understand > > > your > > > > >> > latest > > > > >> > > > >> > > > suggestion", > > > > >> > > > >> > > > > > are > > > > >> > > > >> > > > > > > >> you > > > > >> > > > >> > > > > > > >> > > > referring to solution related to > > unclean > > > > >> leader > > > > >> > > > >> election > > > > >> > > > >> > > > using > > > > >> > > > >> > > > > > > >> > > leader_epoch > > > > >> > > > >> > > > > > > >> > > > in my previous email? > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > Thanks, > > > > >> > > > >> > > > > > > >> > > > Dong > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun > > Rao > > > < > > > > >> > > > >> > j...@confluent.io > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > wrote: > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > Hi, Dong, > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully understand > > your > > > > >> latest > > > > >> > > > >> > suggestion. > > > > >> > > > >> > > > > > > >> Returning an > > > > >> > > > >> > > > > > > >> > > > ever > > > > >> > > > >> > > > > > > >> > > > > growing global metadata version > > itself > > > > is > > > > >> no > > > > >> > > > ideal, > > > > >> > > > >> > but > > > > >> > > > >> > > is > > > > >> > > > >> > > > > > fine. > > > > >> > > > >> > > > > > > >> My > > > > >> > > > >> > > > > > > >> > > > > question is whether the metadata > > > version > > > > >> > > returned > > > > >> > > > >> in > > > > >> > > > >> > the > > > > >> > > > >> > > > > fetch > > > > >> > > > >> > > > > > > >> > response > > > > >> > > > >> > > > > > > >> > > > > needs to be stored with the offset > > > > >> together > > > > >> > if > > > > >> > > > >> offsets > > > > >> > > > >> > > are > > > > >> > > > >> > > > > > > stored > > > > >> > > > >> > > > > > > >> > > > > externally. If so, we also have to > > > > change > > > > >> the > > > > >> > > > >> consumer > > > > >> > > > >> > > API > > > > >> > > > >> > > > > for > > > > >> > > > >> > > > > > > >> > > > commitSync() > > > > >> > > > >> > > > > > > >> > > > > and need to worry about > > compatibility. > > > > If > > > > >> we > > > > >> > > > don't > > > > >> > > > >> > store > > > > >> > > > >> > > > the > > > > >> > > > >> > > > > > > >> metadata > > > > >> > > > >> > > > > > > >> > > > > version together with the offset, > > on a > > > > >> > consumer > > > > >> > > > >> > restart, > > > > >> > > > >> > > > > it's > > > > >> > > > >> > > > > > > not > > > > >> > > > >> > > > > > > >> > clear > > > > >> > > > >> > > > > > > >> > > > how > > > > >> > > > >> > > > > > > >> > > > > we can ensure the metadata in the > > > > >> consumer is > > > > >> > > > high > > > > >> > > > >> > > enough > > > > >> > > > >> > > > > > since > > > > >> > > > >> > > > > > > >> there > > > > >> > > > >> > > > > > > >> > > is > > > > >> > > > >> > > > > > > >> > > > no > > > > >> > > > >> > > > > > > >> > > > > metadata version to compare with. > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > Thanks, > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > Jun > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, > Dong > > > > Lin < > > > > >> > > > >> > > > > lindon...@gmail.com > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > wrote: > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > Hey Jun, > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the explanation. > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > I understand the advantage of > > > > >> > partition_epoch > > > > >> > > > >> over > > > > >> > > > >> > > > > > > >> metadata_epoch. > > > > >> > > > >> > > > > > > >> > My > > > > >> > > > >> > > > > > > >> > > > > > current concern is that the use > of > > > > >> > > leader_epoch > > > > >> > > > >> and > > > > >> > > > >> > > the > > > > >> > > > >> > > > > > > >> > > partition_epoch > > > > >> > > > >> > > > > > > >> > > > > > requires us considerable change > to > > > > >> > consumer's > > > > >> > > > >> public > > > > >> > > > >> > > API > > > > >> > > > >> > > > > to > > > > >> > > > >> > > > > > > take > > > > >> > > > >> > > > > > > >> > care > > > > >> > > > >> > > > > > > >> > > > of > > > > >> > > > >> > > > > > > >> > > > > > the case where user stores > offset > > > > >> > externally. > > > > >> > > > For > > > > >> > > > >> > > > example, > > > > >> > > > >> > > > > > > >> > > *consumer*. > > > > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to > > take > > > a > > > > >> map > > > > >> > > whose > > > > >> > > > >> > value > > > > >> > > > >> > > is > > > > >> > > > >> > > > > > > >> <offset, > > > > >> > > > >> > > > > > > >> > > > > metadata, > > > > >> > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>. > > > > >> > > > >> > *consumer*.*seek*(...) > > > > >> > > > >> > > > > would > > > > >> > > > >> > > > > > > >> also > > > > >> > > > >> > > > > > > >> > > need > > > > >> > > > >> > > > > > > >> > > > > > leader_epoch and partition_epoch > > as > > > > >> > > parameter. > > > > >> > > > >> > > > Technically > > > > >> > > > >> > > > > > we > > > > >> > > > >> > > > > > > >> can > > > > >> > > > >> > > > > > > >> > > > > probably > > > > >> > > > >> > > > > > > >> > > > > > still make it work in a backward > > > > >> compatible > > > > >> > > > >> manner > > > > >> > > > >> > > after > > > > >> > > > >> > > > > > > careful > > > > >> > > > >> > > > > > > >> > > design > > > > >> > > > >> > > > > > > >> > > > > and > > > > >> > > > >> > > > > > > >> > > > > > discussion. But these changes > can > > > make > > > > >> the > > > > >> > > > >> > consumer's > > > > >> > > > >> > > > > > > interface > > > > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for more > > users > > > > >> who do > > > > >> > > not > > > > >> > > > >> > store > > > > >> > > > >> > > > > offset > > > > >> > > > >> > > > > > > >> > > > externally. > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > After thinking more about it, we > > can > > > > >> > address > > > > >> > > > all > > > > >> > > > >> > > > problems > > > > >> > > > >> > > > > > > >> discussed > > > > >> > > > >> > > > > > > >> > > by > > > > >> > > > >> > > > > > > >> > > > > only > > > > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch without > > > > >> > introducing > > > > >> > > > >> > > > leader_epoch > > > > >> > > > >> > > > > or > > > > >> > > > >> > > > > > > the > > > > >> > > > >> > > > > > > >> > > > > > partition_epoch. The current KIP > > > > >> describes > > > > >> > > the > > > > >> > > > >> > changes > > > > >> > > > >> > > > to > > > > >> > > > >> > > > > > the > > > > >> > > > >> > > > > > > >> > > consumer > > > > >> > > > >> > > > > > > >> > > > > API > > > > >> > > > >> > > > > > > >> > > > > > and how the new API can be used > if > > > > user > > > > >> > > stores > > > > >> > > > >> > offset > > > > >> > > > >> > > > > > > >> externally. > > > > >> > > > >> > > > > > > >> > In > > > > >> > > > >> > > > > > > >> > > > > order > > > > >> > > > >> > > > > > > >> > > > > > to address the scenario you > > > described > > > > >> > > earlier, > > > > >> > > > we > > > > >> > > > >> > can > > > > >> > > > >> > > > > > include > > > > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the > > FetchResponse > > > > and > > > > >> the > > > > >> > > > >> > > > > > > LeaderAndIsrRequest. > > > > >> > > > >> > > > > > > >> > > > Consumer > > > > >> > > > >> > > > > > > >> > > > > > remembers the largest > > metadata_epoch > > > > >> from > > > > >> > all > > > > >> > > > the > > > > >> > > > >> > > > > > > FetchResponse > > > > >> > > > >> > > > > > > >> it > > > > >> > > > >> > > > > > > >> > > has > > > > >> > > > >> > > > > > > >> > > > > > received. The metadata_epoch > > > committed > > > > >> with > > > > >> > > the > > > > >> > > > >> > > offset, > > > > >> > > > >> > > > > > either > > > > >> > > > >> > > > > > > >> > within > > > > >> > > > >> > > > > > > >> > > > or > > > > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be the > > largest > > > > >> > > > >> metadata_epoch > > > > >> > > > >> > > > across > > > > >> > > > >> > > > > > all > > > > >> > > > >> > > > > > > >> > > > > > FetchResponse and > MetadataResponse > > > > ever > > > > >> > > > received > > > > >> > > > >> by > > > > >> > > > >> > > this > > > > >> > > > >> > > > > > > >> consumer. > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only the > > > > >> > metadata_epoch > > > > >> > > > is > > > > >> > > > >> > that > > > > >> > > > >> > > we > > > > >> > > > >> > > > > can > > > > >> > > > >> > > > > > > not > > > > >> > > > >> > > > > > > >> > > always > > > > >> > > > >> > > > > > > >> > > > > do > > > > >> > > > >> > > > > > > >> > > > > > the smart offset reset in case > of > > > > >> unclean > > > > >> > > > leader > > > > >> > > > >> > > > election > > > > >> > > > >> > > > > > > which > > > > >> > > > >> > > > > > > >> you > > > > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in most > > case, > > > > >> > unclean > > > > >> > > > >> leader > > > > >> > > > >> > > > > election > > > > >> > > > >> > > > > > > >> > probably > > > > >> > > > >> > > > > > > >> > > > > > happens when consumer is not > > > > >> > > > >> rebalancing/restarting. > > > > >> > > > >> > > In > > > > >> > > > >> > > > > > these > > > > >> > > > >> > > > > > > >> > cases, > > > > >> > > > >> > > > > > > >> > > > > either > > > > >> > > > >> > > > > > > >> > > > > > consumer is not directly > affected > > by > > > > >> > unclean > > > > >> > > > >> leader > > > > >> > > > >> > > > > election > > > > >> > > > >> > > > > > > >> since > > > > >> > > > >> > > > > > > >> > it > > > > >> > > > >> > > > > > > >> > > > is > > > > >> > > > >> > > > > > > >> > > > > > not consuming from the end of > the > > > log, > > > > >> or > > > > >> > > > >> consumer > > > > >> > > > >> > can > > > > >> > > > >> > > > > > derive > > > > >> > > > >> > > > > > > >> the > > > > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most > recent > > > > >> message > > > > >> > > > >> received > > > > >> > > > >> > > > before > > > > >> > > > >> > > > > it > > > > >> > > > >> > > > > > > >> sees > > > > >> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I > am > > > not > > > > >> sure > > > > >> > > it > > > > >> > > > is > > > > >> > > > >> > > worth > > > > >> > > > >> > > > > > adding > > > > >> > > > >> > > > > > > >> the > > > > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to > > > > address > > > > >> the > > > > >> > > > >> > remaining > > > > >> > > > >> > > > > corner > > > > >> > > > >> > > > > > > >> case. > > > > >> > > > >> > > > > > > >> > > What > > > > >> > > > >> > > > > > > >> > > > > do > > > > >> > > > >> > > > > > > >> > > > > > you think? > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > Thanks, > > > > >> > > > >> > > > > > > >> > > > > > Dong > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, > > Jun > > > > Rao > > > > >> < > > > > >> > > > >> > > > j...@confluent.io > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > >> wrote: > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong, > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply. > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic recreation > > > issue, > > > > >> we > > > > >> > > could > > > > >> > > > >> use > > > > >> > > > >> > > > > either a > > > > >> > > > >> > > > > > > >> global > > > > >> > > > >> > > > > > > >> > > > > > metadata > > > > >> > > > >> > > > > > > >> > > > > > > version or a partition level > > > epoch. > > > > >> But > > > > >> > > > either > > > > >> > > > >> one > > > > >> > > > >> > > > will > > > > >> > > > >> > > > > > be a > > > > >> > > > >> > > > > > > >> new > > > > >> > > > >> > > > > > > >> > > > > concept, > > > > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter seems > > > more > > > > >> > > natural. > > > > >> > > > It > > > > >> > > > >> > also > > > > >> > > > >> > > > > makes > > > > >> > > > >> > > > > > > it > > > > >> > > > >> > > > > > > >> > > easier > > > > >> > > > >> > > > > > > >> > > > to > > > > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's offset > is > > > > still > > > > >> > > valid > > > > >> > > > >> > after a > > > > >> > > > >> > > > > topic > > > > >> > > > >> > > > > > > is > > > > >> > > > >> > > > > > > >> > > > > recreated. > > > > >> > > > >> > > > > > > >> > > > > > As > > > > >> > > > >> > > > > > > >> > > > > > > you pointed out, we don't need > > to > > > > >> store > > > > >> > the > > > > >> > > > >> > > partition > > > > >> > > > >> > > > > > epoch > > > > >> > > > >> > > > > > > in > > > > >> > > > >> > > > > > > >> > the > > > > >> > > > >> > > > > > > >> > > > > > message. > > > > >> > > > >> > > > > > > >> > > > > > > The following is what I am > > > thinking. > > > > >> > When a > > > > >> > > > >> > > partition > > > > >> > > > >> > > > is > > > > >> > > > >> > > > > > > >> created, > > > > >> > > > >> > > > > > > >> > > we > > > > >> > > > >> > > > > > > >> > > > > can > > > > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch from > an > > > > >> > > > >> ever-increasing > > > > >> > > > >> > > > global > > > > >> > > > >> > > > > > > >> counter > > > > >> > > > >> > > > > > > >> > and > > > > >> > > > >> > > > > > > >> > > > > store > > > > >> > > > >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/ > > > > >> > > > >> > > > partitions/[partitionId] > > > > >> > > > >> > > > > in > > > > >> > > > >> > > > > > > ZK. > > > > >> > > > >> > > > > > > >> > The > > > > >> > > > >> > > > > > > >> > > > > > > partition > > > > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to every > > > broker. > > > > >> The > > > > >> > > > >> consumer > > > > >> > > > >> > > will > > > > >> > > > >> > > > > be > > > > >> > > > >> > > > > > > >> > tracking > > > > >> > > > >> > > > > > > >> > > a > > > > >> > > > >> > > > > > > >> > > > > > tuple > > > > >> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, > > > partition > > > > >> > epoch> > > > > >> > > > for > > > > >> > > > >> > > > offsets. > > > > >> > > > >> > > > > > If a > > > > >> > > > >> > > > > > > >> > topic > > > > >> > > > >> > > > > > > >> > > is > > > > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible that > a > > > > >> > consumer's > > > > >> > > > >> offset > > > > >> > > > >> > > and > > > > >> > > > >> > > > > > leader > > > > >> > > > >> > > > > > > >> > epoch > > > > >> > > > >> > > > > > > >> > > > > still > > > > >> > > > >> > > > > > > >> > > > > > > match that in the broker, but > > > > >> partition > > > > >> > > epoch > > > > >> > > > >> > won't > > > > >> > > > >> > > > be. > > > > >> > > > >> > > > > In > > > > >> > > > >> > > > > > > >> this > > > > >> > > > >> > > > > > > >> > > case, > > > > >> > > > >> > > > > > > >> > > > > we > > > > >> > > > >> > > > > > > >> > > > > > > can potentially still treat > the > > > > >> > consumer's > > > > >> > > > >> offset > > > > >> > > > >> > as > > > > >> > > > >> > > > out > > > > >> > > > >> > > > > > of > > > > >> > > > >> > > > > > > >> range > > > > >> > > > >> > > > > > > >> > > and > > > > >> > > > >> > > > > > > >> > > > > > reset > > > > >> > > > >> > > > > > > >> > > > > > > the offset based on the offset > > > reset > > > > >> > policy > > > > >> > > > in > > > > >> > > > >> the > > > > >> > > > >> > > > > > consumer. > > > > >> > > > >> > > > > > > >> This > > > > >> > > > >> > > > > > > >> > > > seems > > > > >> > > > >> > > > > > > >> > > > > > > harder to do with a global > > > metadata > > > > >> > > version. > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > Jun > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 > AM, > > > > Dong > > > > >> > Lin < > > > > >> > > > >> > > > > > > >> lindon...@gmail.com> > > > > >> > > > >> > > > > > > >> > > > wrote: > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun, > > > > >> > > > >> > > > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good example. > > > After > > > > >> > > thinking > > > > >> > > > >> > > through > > > > >> > > > >> > > > > this > > > > >> > > > >> > > > > > > in > > > > >> > > > >> > > > > > > >> > > > detail, I > > > > >> > > > >> > > > > > > >> > > > > > > agree > > > > >> > > > >> > > > > > > >> > > > > > > > that we need to commit > offset > > > with > > > > >> > leader > > > > >> > > > >> epoch > > > > >> > > > >> > in > > > > >> > > > >> > > > > order > > > > >> > > > >> > > > > > > to > > > > >> > > > >> > > > > > > >> > > address > > > > >> > > > >> > > > > > > >> > > > > > this > > > > >> > > > >> > > > > > > >> > > > > > > > example. > > > > >> > > > >> > > > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining > question > > > is > > > > >> how > > > > >> > to > > > > >> > > > >> address > > > > >> > > > >> > > the > > > > >> > > > >> > > > > > > >> scenario > > > > >> > > > >> > > > > > > >> > > that > > > > >> > > > >> > > > > > > >> > > > > the > > > > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and > > re-created. > > > > One > > > > >> > > > possible > > > > >> > > > >> > > > solution > > > > >> > > > >> > > > > > is > > > > >> > > > >> > > > > > > to > > > > >> > > > >> > > > > > > >> > > commit > > > > >> > > > >> > > > > > > >> > > > > > > offset > > > > >> > > > >> > > > > > > >> > > > > > > > with both the leader epoch > and > > > the > > > > >> > > metadata > > > > >> > > > >> > > version. > > > > >> > > > >> > > > > The > > > > >> > > > >> > > > > > > >> logic > > > > >> > > > >> > > > > > > >> > > and > > > > >> > > > >> > > > > > > >> > > > > the > > > > >> > > > >> > > > > > > >> > > > > > > > implementation of this > > solution > > > > does > > > > >> > not > > > > >> > > > >> > require a > > > > >> > > > >> > > > new > > > > >> > > > >> > > > > > > >> concept > > > > >> > > > >> > > > > > > >> > > > (e.g. > > > > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and it does > > not > > > > >> > require > > > > >> > > > any > > > > >> > > > >> > > change > > > > >> > > > >> > > > to > > > > >> > > > >> > > > > > the > > > > >> > > > >> > > > > > > >> > > message > > > > >> > > > >> > > > > > > >> > > > > > format > > > > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also > > allows > > > us > > > > >> to > > > > >> > > order > > > > >> > > > >> the > > > > >> > > > >> > > > > metadata > > > > >> > > > >> > > > > > > in > > > > >> > > > >> > > > > > > >> a > > > > >> > > > >> > > > > > > >> > > > > > > > straightforward manner which > > may > > > > be > > > > >> > > useful > > > > >> > > > in > > > > >> > > > >> > the > > > > >> > > > >> > > > > > future. > > > > >> > > > >> > > > > > > >> So it > > > > >> > > > >> > > > > > > >> > > may > > > > >> > > > >> > > > > > > >> > > > > be > > > > >> > > > >> > > > > > > >> > > > > > a > > > > >> > > > >> > > > > > > >> > > > > > > > better solution than > > generating > > > a > > > > >> > random > > > > >> > > > >> > partition > > > > >> > > > >> > > > > epoch > > > > >> > > > >> > > > > > > >> every > > > > >> > > > >> > > > > > > >> > > time > > > > >> > > > >> > > > > > > >> > > > > we > > > > >> > > > >> > > > > > > >> > > > > > > > create a partition. Does > this > > > > sound > > > > >> > > > >> reasonable? > > > > >> > > > >> > > > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern with > > > using > > > > >> the > > > > >> > > > >> metadata > > > > >> > > > >> > > > version > > > > >> > > > >> > > > > > is > > > > >> > > > >> > > > > > > >> that > > > > >> > > > >> > > > > > > >> > > > > consumer > > > > >> > > > >> > > > > > > >> > > > > > > > will be forced to refresh > > > metadata > > > > >> even > > > > >> > > if > > > > >> > > > >> > > metadata > > > > >> > > > >> > > > > > > version > > > > >> > > > >> > > > > > > >> is > > > > >> > > > >> > > > > > > >> > > > > > increased > > > > >> > > > >> > > > > > > >> > > > > > > > due to topics that the > > consumer > > > is > > > > >> not > > > > >> > > > >> > interested > > > > >> > > > >> > > > in. > > > > >> > > > >> > > > > > Now > > > > >> > > > >> > > > > > > I > > > > >> > > > >> > > > > > > >> > > > realized > > > > >> > > > >> > > > > > > >> > > > > > that > > > > >> > > > >> > > > > > > >> > > > > > > > this is probably not a > > problem. > > > > >> > Currently > > > > >> > > > >> client > > > > >> > > > >> > > > will > > > > >> > > > >> > > > > > > >> refresh > > > > >> > > > >> > > > > > > >> > > > > metadata > > > > >> > > > >> > > > > > > >> > > > > > > > either due to > > > > >> InvalidMetadataException > > > > >> > in > > > > >> > > > the > > > > >> > > > >> > > > response > > > > >> > > > >> > > > > > > from > > > > >> > > > >> > > > > > > >> > > broker > > > > >> > > > >> > > > > > > >> > > > or > > > > >> > > > >> > > > > > > >> > > > > > due > > > > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The > > addition > > > > of > > > > >> the > > > > >> > > > >> metadata > > > > >> > > > >> > > > > version > > > > >> > > > >> > > > > > > >> should > > > > >> > > > >> > > > > > > >> > > > > > increase > > > > >> > > > >> > > > > > > >> > > > > > > > the overhead of metadata > > refresh > > > > >> caused > > > > >> > > by > > > > >> > > > >> > > > > > > >> > > > InvalidMetadataException. > > > > >> > > > >> > > > > > > >> > > > > If > > > > >> > > > >> > > > > > > >> > > > > > > > client refresh metadata due > to > > > > >> expiry > > > > >> > and > > > > >> > > > it > > > > >> > > > >> > > > receives > > > > >> > > > >> > > > > a > > > > >> > > > >> > > > > > > >> > metadata > > > > >> > > > >> > > > > > > >> > > > > whose > > > > >> > > > >> > > > > > > >> > > > > > > > version is lower than the > > > current > > > > >> > > metadata > > > > >> > > > >> > > version, > > > > >> > > > >> > > > we > > > > >> > > > >> > > > > > can > > > > >> > > > >> > > > > > > >> > reject > > > > >> > > > >> > > > > > > >> > > > the > > > > >> > > > >> > > > > > > >> > > > > > > > metadata but still reset the > > > > >> metadata > > > > >> > > age, > > > > >> > > > >> which > > > > >> > > > >> > > > > > > essentially > > > > >> > > > >> > > > > > > >> > keep > > > > >> > > > >> > > > > > > >> > > > the > > > > >> > > > >> > > > > > > >> > > > > > > > existing behavior in the > > client. > > > > >> > > > >> > > > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much, > > > > >> > > > >> > > > > > > >> > > > > > > > Dong > > > > >> > > > >> > > > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >