Hey Jun, Thanks much for the comments. Please see my comments inline.
On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > Thanks for the updated KIP. Looks good to me overall. Just a few minor > comments. > > 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition partition): It > seems that there is no need to return metadata. We probably want to return > sth like OffsetAndEpoch. > Previously I think we may want to re-use the existing class to keep our consumer interface simpler. I have updated the KIP to add class OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may confuse this name with OffsetEpoch. Does this sound OK? > > 61. Should we store partition_epoch in > /brokers/topics/[topic]/partitions/[partitionId] in ZK? > I have considered this. I think the advantage of adding the partition->partition_epoch map in the existing znode /brokers/topics/[topic]/partitions is that controller only needs to read one znode per topic to gets its partition_epoch information. Otherwise controller may need to read one extra znode per partition to get the same information. When we delete partition or expand partition of a topic, someone needs to modify partition->partition_epoch map in znode /brokers/topics/[topic]/partitions. This may seem a bit more complicated than simply adding or deleting znode /brokers/topics/[topic]/partitions/[partitionId]. But the complexity is probably similar to the existing operation of modifying the partition->replica_list mapping in znode /brokers/topics/[topic]. So not sure it is better to store the partition_epoch in /brokers/topics/[topic]/partitions/[partitionId]. What do you think? > > 62. For checking outdated metadata in the client, we probably want to add > when max_partition_epoch will be used. > The max_partition_epoch is used in the Proposed Changes -> Client's metadata refresh section to determine whether a metadata is outdated. And this formula is referenced and re-used in other sections to determine whether a metadata is outdated. Does this formula look OK? > > 63. "The leader_epoch should be the largest leader_epoch of messages whose > offset < the commit offset. If no message has been consumed since consumer > initialization, the leader_epoch from seek(...) or OffsetFetchResponse > should be used. The partition_epoch should be read from the last > FetchResponse corresponding to the given partition and commit offset. ": > leader_epoch and partition_epoch are associated with an offset. So, if no > message is consumed, there is no offset and therefore there is no need to > read leader_epoch and partition_epoch. Also, the leader_epoch associated > with the offset should just come from the messages returned in the fetch > response. > I am thinking that, if user calls seek(..) and commitSync(...) without consuming any messages, we should re-use the leader_epoch and partition_epoch provided by the seek(...) in the OffsetCommitRequest. And if messages have been successfully consumed, then leader_epoch will come from the messages returned in the fetch response. The condition "messages whose offset < the commit offset" is needed to take care of the log compacted topic which may have offset gap due to log cleaning. Did I miss something here? Or should I rephrase the paragraph to make it less confusing? > 64. Could you include the public methods in the OffsetEpoch class? > I mistakenly deleted the definition of OffsetEpoch class from the KIP. I just added it back with the public methods. Could you take another look? > > Jun > > > On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Thanks much. I agree that we can not rely on committed offsets to be > always > > deleted when we delete topic. So it is necessary to use a per-partition > > epoch that does not change unless this partition is deleted. I also agree > > that it is very nice to be able to uniquely identify a message with > > (offset, leader_epoch, partition_epoch) in face of potential topic > deletion > > and unclean leader election. > > > > I agree with all your comments. And I have updated the KIP based on our > > latest discussion. In addition, I added InvalidPartitionEpochException > > which will be thrown by consumer.poll() if the partition_epoch associated > > with the partition, which can be given to consumer using seek(...), is > > different from the partition_epoch in the FetchResponse. > > > > Can you take another look at the latest KIP? > > > > Thanks! > > Dong > > > > > > > > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > My replies are the following. > > > > > > 60. What you described could also work. The drawback is that we will be > > > unnecessarily changing the partition epoch when a partition hasn't > really > > > changed. I was imagining that the partition epoch will be stored in > > > /brokers/topics/[topic]/partitions/[partitionId], instead of at the > > topic > > > level. So, not sure if ZK size limit is an issue. > > > > > > 61, 62 and 65. To me, the offset + offset_epoch is a unique identifier > > for > > > a message. So, if a message hasn't changed, the offset and the > associated > > > offset_epoch ideally should remain the same (it will be kind of weird > if > > > two consumer apps save the offset on the same message, but the > > offset_epoch > > > are different). partition_epoch + leader_epoch give us that. > > global_epoch + > > > leader_epoch don't. If we use this approach, we can solve not only the > > > problem that you have identified, but also other problems when there is > > > data loss or topic re-creation more reliably. For example, in the > future, > > > if we include the partition_epoch and leader_epoch in the fetch > request, > > > the server can do a more reliable check of whether that offset is valid > > or > > > not. I am not sure that we can rely upon all external offsets to be > > removed > > > on topic deletion. For example, a topic may be deleted by an admin who > > may > > > not know all the applications. > > > > > > If we agree on the above, the second question is then how to reliably > > > propagate the partition_epoch and the leader_epoch to the consumer when > > > there are leader or partition changes. The leader_epoch comes from the > > > message, which is reliable. So, I was suggesting that when we store an > > > offset, we can just store the leader_epoch from the message set > > containing > > > that offset. Similarly, I was thinking that if the partition_epoch is > in > > > the fetch response, we can propagate partition_epoch reliably where is > > > partition_epoch change. > > > > > > 63. My point is that once a leader is producing a message in the new > > > partition_epoch, ideally, we should associate the new offsets with the > > new > > > partition_epoch. Otherwise, the offset_epoch won't be the correct > unique > > > identifier (useful for solving other problems mentioned above). I was > > > originally thinking that the leader will include the partition_epoch in > > the > > > metadata cache in the fetch response. It's just that right now, > metadata > > > cache is updated on UpdateMetadataRequest, which typically happens > after > > > the LeaderAndIsrRequest. Another approach is for the leader to cache > the > > > partition_epoch in the Partition object and return that (instead of the > > one > > > in metadata cache) in the fetch response. > > > > > > 65. It seems to me that the global_epoch and the partition_epoch have > > > different purposes. A partition_epoch has the benefit that it (1) can > be > > > used to form a unique identifier for a message and (2) can be used to > > > solve other > > > corner case problems in the future. I am not sure having just a > > > global_epoch can achieve these. global_epoch is useful to determine > which > > > version of the metadata is newer, especially with topic deletion. > > > > > > Thanks, > > > > > > Jun > > > > > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Regarding the use of the global epoch in 65), it is very similar to > the > > > > proposal of the metadata_epoch we discussed earlier. The main > > difference > > > is > > > > that this epoch is incremented when we create/expand/delete topic and > > > does > > > > not change when controller re-send metadata. > > > > > > > > I looked at our previous discussion. It seems that we prefer > > > > partition_epoch over the metadata_epoch because 1) we prefer not to > > have > > > an > > > > ever growing metadata_epoch and 2) we can reset offset better when > > topic > > > is > > > > re-created. The use of global topic_epoch avoids the drawback of an > > ever > > > > quickly ever growing metadata_epoch. Though the global epoch does not > > > allow > > > > us to recognize the invalid offset committed before the topic > > > re-creation, > > > > we can probably just delete the offset when we delete a topic. Thus I > > am > > > > not very sure whether it is still worthwhile to have a per-partition > > > > partition_epoch if the metadata already has the global epoch. > > > > > > > > > > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > Thanks so much. These comments very useful. Please see below my > > > comments. > > > > > > > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > >> Hi, Dong, > > > > >> > > > > >> Thanks for the updated KIP. A few more comments. > > > > >> > > > > >> 60. Perhaps having a partition epoch is more flexible since in the > > > > future, > > > > >> we may support deleting a partition as well. > > > > >> > > > > > > > > > > Yeah I have considered this. I think we can probably still support > > > > > deleting a partition by using the topic_epoch -- when partition of > a > > > > topic > > > > > is deleted or created, epoch of all partitions of this topic will > be > > > > > incremented by 1. Therefore, if that partition is re-created later, > > the > > > > > epoch of that partition will still be larger than its epoch before > > the > > > > > deletion, which still allows the client to order the metadata for > the > > > > > purpose of this KIP. Does this sound reasonable? > > > > > > > > > > The advantage of using topic_epoch instead of partition_epoch is > that > > > the > > > > > size of the /brokers/topics/[topic] znode and request/response size > > can > > > > be > > > > > smaller. We have a limit on the maximum size of znode (typically > > 1MB). > > > > Use > > > > > partition epoch can effectively reduce the number of partitions > that > > > can > > > > be > > > > > described by the /brokers/topics/[topic] znode. > > > > > > > > > > One use-case of partition_epoch for client to detect that the > > committed > > > > > offset, either from kafka offset topic or from the external store > is > > > > > invalid after partition deletion and re-creation. However, it seems > > > that > > > > we > > > > > can also address this use-case with other approaches. For example, > > when > > > > > AdminClient deletes partitions, it can also delete the committed > > > offsets > > > > > for those partitions from the offset topic. If user stores offset > > > > > externally, it might make sense for user to similarly remove > offsets > > of > > > > > related partitions after these partitions are deleted. So I am not > > sure > > > > > that we should use partition_epoch in this KIP. > > > > > > > > > > > > > > >> > > > > >> 61. It seems that the leader epoch returned in the position() call > > > > should > > > > >> the the leader epoch returned in the fetch response, not the one > in > > > the > > > > >> metadata cache of the client. > > > > > > > > > > > > > > > I think this is a good idea. Just to double check, this change does > > not > > > > > affect the correctness or performance of this KIP. But it can be > > useful > > > > if > > > > > we want to use the leader_epoch to better handle the offset rest in > > > case > > > > of > > > > > unclean leader election, which is listed in the future work. Is > this > > > > > understanding correct? > > > > > > > > > > I have updated the KIP to specify that the leader_epoch returned by > > > > > position() should be the largest leader_epoch of those already > > consumed > > > > > messages whose offset < position. If no message has been consumed > > since > > > > > consumer initialization, the leader_epoch from seek() or > > > > > OffsetFetchResponse should be used. The offset included in the > > > > > OffsetCommitRequest will also be determined in the similar manner. > > > > > > > > > > > > > > >> > > > > >> 62. I am wondering if we should return the partition epoch in the > > > fetch > > > > >> response as well. In the current proposal, if a topic is recreated > > and > > > > the > > > > >> new leader is on the same broker as the old one, there is nothing > to > > > > force > > > > >> the metadata refresh in the client. So, the client may still > > associate > > > > the > > > > >> offset with the old partition epoch. > > > > >> > > > > > > > > > > Could you help me understand the problem if a client associates old > > > > > partition_epoch (or the topic_epoch as of the current KIP) with the > > > > offset? > > > > > The main purpose of the topic_epoch is to be able to drop > > leader_epoch > > > > to 0 > > > > > after a partition is deleted and re-created. I guess you may be > > > thinking > > > > > about using the partition_epoch to detect that the committed offset > > is > > > > > invalid? In that case, I am wondering if the alternative approach > > > > described > > > > > in 60) would be reasonable. > > > > > > > > > > > > > > >> > > > > >> 63. There is some subtle coordination between the > > LeaderAndIsrRequest > > > > and > > > > >> UpdateMetadataRequest. Currently, when a leader changes, the > > > controller > > > > >> first sends the LeaderAndIsrRequest to the assigned replicas and > the > > > > >> UpdateMetadataRequest to every broker. So, there could be a small > > > window > > > > >> when the leader already receives the new partition epoch in the > > > > >> LeaderAndIsrRequest, but the metadata cache in the broker hasn't > > been > > > > >> updated with the latest partition epoch. Not sure what's the best > > way > > > to > > > > >> address this issue. Perhaps we can update the metadata cache on > the > > > > broker > > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The > > challenge > > > > is > > > > >> that the two have slightly different data. For example, only the > > > latter > > > > >> has > > > > >> all endpoints. > > > > >> > > > > > > > > > > I am not sure whether this is a problem. Could you explain a bit > more > > > > what > > > > > specific problem this small window can cause? > > > > > > > > > > Since client can fetch metadata from any broker in the cluster, and > > > given > > > > > that different brokers receive request (e.g. LeaderAndIsrRequest > and > > > > > UpdateMetadataRequest) in arbitrary order, the metadata received by > > > > client > > > > > can be in arbitrary order (either newer or older) compared to the > > > > broker's > > > > > leadership state even if a given broker receives > LeaderAndIsrRequest > > > and > > > > > UpdateMetadataRequest simultaneously. So I am not sure it is useful > > to > > > > > update broker's cache with LeaderAndIsrRequest. > > > > > > > > > > > > > > >> 64. The enforcement of leader epoch in Offset commit: We allow a > > > > consumer > > > > >> to set an arbitrary offset. So it's possible for offsets or leader > > > epoch > > > > >> to > > > > >> go backwards. I am not sure if we could always enforce that the > > leader > > > > >> epoch only goes up on the broker. > > > > >> > > > > > > > > > > Sure. I have removed this check from the KIP. > > > > > > > > > > BTW, we can probably still ensure that the leader_epoch always > > increase > > > > if > > > > > the leader_epoch used with offset commit is the max(leader_epoch of > > the > > > > > message with offset = the committed offset - 1, the largest known > > > > > leader_epoch from the metadata). But I don't have a good use-case > for > > > > this > > > > > alternative definition. So I choose the keep the KIP simple by > > > requiring > > > > > leader_epoch to always increase. > > > > > > > > > > > > > > >> 65. Good point on handling missing partition epoch due to topic > > > > deletion. > > > > >> Another potential way to address this is to additionally propagate > > the > > > > >> global partition epoch to brokers and the clients. This way, when > a > > > > >> partition epoch is missing, we can use the global partition epoch > to > > > > >> reason > > > > >> about which metadata is more recent. > > > > >> > > > > > > > > > > This is a great idea. The global epoch can be used to order the > > > metadata > > > > > and help us recognize the more recent metadata if a topic (or > > > partition) > > > > is > > > > > deleted and re-created. > > > > > > > > > > Actually, it seems we only need to propagate the global epoch to > > > brokers > > > > > and clients without propagating this epoch on a per-topic or > > > > per-partition > > > > > basic. Doing so would simply interface changes made this KIP. Does > > this > > > > > approach sound reasonable? > > > > > > > > > > > > > > >> 66. A client may also get an offset by time using the > > offsetForTimes() > > > > >> api. > > > > >> So, we probably want to include offsetInternalMetadata in > > > > >> OffsetAndTimestamp > > > > >> as well. > > > > >> > > > > > > > > > > You are right. This probably also requires us to change the > > > > > ListOffsetRequest as well. I will update the KIP after we agree on > > the > > > > > solution for 65). > > > > > > > > > > > > > > >> > > > > >> 67. InteralMetadata can be a bit confusing with the metadata field > > > > already > > > > >> there. Perhaps we can just call it OffsetEpoch. It might be useful > > to > > > > make > > > > >> OffsetEpoch printable at least for debugging purpose. Once you do > > > that, > > > > we > > > > >> are already exposing the internal fields. So, not sure if it's > worth > > > > >> hiding > > > > >> them. If we do want to hide them, perhaps we can have sth like the > > > > >> following. The binary encoding is probably more efficient than > JSON > > > for > > > > >> external storage. > > > > >> > > > > >> OffsetEpoch { > > > > >> static OffsetEpoch decode(byte[]); > > > > >> > > > > >> public byte[] encode(); > > > > >> > > > > >> public String toString(); > > > > >> } > > > > >> > > > > > > > > > > Thanks much. I like this solution. I have updated the KIP > > accordingly. > > > > > > > > > > > > > > > > > > > >> > > > > >> Jun > > > > >> > > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > >> > > > > >> > Hey Jason, > > > > >> > > > > > >> > Certainly. This sounds good. I have updated the KIP to clarity > > that > > > > the > > > > >> > global epoch will be incremented by 1 each time a topic is > > deleted. > > > > >> > > > > > >> > Thanks, > > > > >> > Dong > > > > >> > > > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson < > > ja...@confluent.io > > > > > > > > >> > wrote: > > > > >> > > > > > >> > > Hi Dong, > > > > >> > > > > > > >> > > > > > > >> > > I think your approach will allow user to distinguish between > the > > > > >> metadata > > > > >> > > > before and after the topic deletion. I also agree that this > > can > > > be > > > > >> > > > potentially be useful to user. I am just not very sure > whether > > > we > > > > >> > already > > > > >> > > > have a good use-case to make the additional complexity > > > worthwhile. > > > > >> It > > > > >> > > seems > > > > >> > > > that this feature is kind of independent of the main problem > > of > > > > this > > > > >> > KIP. > > > > >> > > > Could we add this as a future work? > > > > >> > > > > > > >> > > > > > > >> > > Do you think it's fair if we bump the topic epoch on deletion > > and > > > > >> leave > > > > >> > > propagation of the epoch for deleted topics for future work? I > > > don't > > > > >> > think > > > > >> > > this adds much complexity and it makes the behavior > consistent: > > > > every > > > > >> > topic > > > > >> > > mutation results in an epoch bump. > > > > >> > > > > > > >> > > Thanks, > > > > >> > > Jason > > > > >> > > > > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com > > > > > > wrote: > > > > >> > > > > > > >> > > > Hey Ismael, > > > > >> > > > > > > > >> > > > I guess we actually need user to see this field so that user > > can > > > > >> store > > > > >> > > this > > > > >> > > > value in the external store together with the offset. We > just > > > > prefer > > > > >> > the > > > > >> > > > value to be opaque to discourage most users from > interpreting > > > this > > > > >> > value. > > > > >> > > > One more advantage of using such an opaque field is to be > able > > > to > > > > >> > evolve > > > > >> > > > the information (or schema) of this value without changing > > > > consumer > > > > >> API > > > > >> > > in > > > > >> > > > the future. > > > > >> > > > > > > > >> > > > I also thinking it is probably OK for user to be able to > > > interpret > > > > >> this > > > > >> > > > value, particularly for those advanced users. > > > > >> > > > > > > > >> > > > Thanks, > > > > >> > > > Dong > > > > >> > > > > > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma < > > ism...@juma.me.uk> > > > > >> wrote: > > > > >> > > > > > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson < > > > > >> ja...@confluent.io> > > > > >> > > > > wrote: > > > > >> > > > > > > > > > >> > > > > > class OffsetAndMetadata { > > > > >> > > > > > long offset; > > > > >> > > > > > byte[] offsetMetadata; > > > > >> > > > > > String metadata; > > > > >> > > > > > } > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > Admittedly, the naming is a bit annoying, but we can > > > probably > > > > >> come > > > > >> > up > > > > >> > > > > with > > > > >> > > > > > something better. Internally the byte array would have a > > > > >> version. > > > > >> > If > > > > >> > > in > > > > >> > > > > the > > > > >> > > > > > future we have anything else we need to add, we can > update > > > the > > > > >> > > version > > > > >> > > > > and > > > > >> > > > > > we wouldn't need any new APIs. > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > We can also add fields to a class in a compatible way. So, > > it > > > > >> seems > > > > >> > to > > > > >> > > me > > > > >> > > > > that the main advantage of the byte array is that it's > > opaque > > > to > > > > >> the > > > > >> > > > user. > > > > >> > > > > Is that correct? If so, we could also add any opaque > > metadata > > > > in a > > > > >> > > > subclass > > > > >> > > > > so that users don't even see it (unless they cast it, but > > then > > > > >> > they're > > > > >> > > on > > > > >> > > > > their own). > > > > >> > > > > > > > > >> > > > > Ismael > > > > >> > > > > > > > > >> > > > > The corresponding seek() and position() APIs might look > > > > something > > > > >> > like > > > > >> > > > > this: > > > > >> > > > > > > > > > >> > > > > > void seek(TopicPartition partition, long offset, byte[] > > > > >> > > > offsetMetadata); > > > > >> > > > > > byte[] positionMetadata(TopicPartition partition); > > > > >> > > > > > > > > > >> > > > > > What do you think? > > > > >> > > > > > > > > > >> > > > > > Thanks, > > > > >> > > > > > Jason > > > > >> > > > > > > > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > > > >> > > > > > > Hey Jun, Jason, > > > > >> > > > > > > > > > > >> > > > > > > Thanks much for all the feedback. I have updated the > KIP > > > > >> based on > > > > >> > > the > > > > >> > > > > > > latest discussion. Can you help check whether it looks > > > good? > > > > >> > > > > > > > > > > >> > > > > > > Thanks, > > > > >> > > > > > > Dong > > > > >> > > > > > > > > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin < > > > > lindon...@gmail.com > > > > >> > > > > > >> > > > wrote: > > > > >> > > > > > > > > > > >> > > > > > > > Hey Jun, > > > > >> > > > > > > > > > > > >> > > > > > > > Hmm... thinking about this more, I am not sure that > > the > > > > >> > proposed > > > > >> > > > API > > > > >> > > > > is > > > > >> > > > > > > > sufficient. For users that store offset externally, > we > > > > >> probably > > > > >> > > > need > > > > >> > > > > > > extra > > > > >> > > > > > > > API to return the leader_epoch and partition_epoch > for > > > all > > > > >> > > > partitions > > > > >> > > > > > > that > > > > >> > > > > > > > consumers are consuming. I suppose these users > > currently > > > > use > > > > >> > > > > position() > > > > >> > > > > > > to > > > > >> > > > > > > > get the offset. Thus we probably need a new method > > > > >> > > > > > positionWithEpoch(..) > > > > >> > > > > > > to > > > > >> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does > > > this > > > > >> sound > > > > >> > > > > > > reasonable? > > > > >> > > > > > > > > > > > >> > > > > > > > Thanks, > > > > >> > > > > > > > Dong > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao < > > > j...@confluent.io > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > > > > > >> > > > > > > >> Hi, Dong, > > > > >> > > > > > > >> > > > > >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be > > > > >> composed > > > > >> > of > > > > >> > > > > > > >> (partition_epoch, > > > > >> > > > > > > >> leader_epoch). > > > > >> > > > > > > >> > > > > >> > > > > > > >> Thanks, > > > > >> > > > > > > >> > > > > >> > > > > > > >> Jun > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin < > > > > >> lindon...@gmail.com > > > > >> > > > > > > >> > > > > wrote: > > > > >> > > > > > > >> > > > > >> > > > > > > >> > Hey Jun, > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > Thanks much. I like the the new API that you > > > proposed. > > > > I > > > > >> am > > > > >> > > not > > > > >> > > > > sure > > > > >> > > > > > > >> what > > > > >> > > > > > > >> > you exactly mean by offset_epoch. I suppose that > we > > > can > > > > >> use > > > > >> > > the > > > > >> > > > > pair > > > > >> > > > > > > of > > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the > > offset_epoch, > > > > >> right? > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > Thanks, > > > > >> > > > > > > >> > Dong > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao < > > > > >> j...@confluent.io> > > > > >> > > > wrote: > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > Hi, Dong, > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > Got it. The api that you proposed works. The > > > question > > > > >> is > > > > >> > > > whether > > > > >> > > > > > > >> that's > > > > >> > > > > > > >> > the > > > > >> > > > > > > >> > > api that we want to have in the long term. My > > > concern > > > > >> is > > > > >> > > that > > > > >> > > > > > while > > > > >> > > > > > > >> the > > > > >> > > > > > > >> > api > > > > >> > > > > > > >> > > change is simple, the new api seems harder to > > > explain > > > > >> and > > > > >> > > use. > > > > >> > > > > For > > > > >> > > > > > > >> > example, > > > > >> > > > > > > >> > > a consumer storing offsets externally now needs > > to > > > > call > > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling seek(). > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > An alternative approach is to make the > following > > > > >> > compatible > > > > >> > > > api > > > > >> > > > > > > >> changes > > > > >> > > > > > > >> > in > > > > >> > > > > > > >> > > Consumer. > > > > >> > > > > > > >> > > * Add an additional OffsetEpoch field in > > > > >> > OffsetAndMetadata. > > > > >> > > > (no > > > > >> > > > > > need > > > > >> > > > > > > >> to > > > > >> > > > > > > >> > > change the CommitSync() api) > > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition partition, > > long > > > > >> > offset, > > > > >> > > > > > > >> OffsetEpoch > > > > >> > > > > > > >> > > offsetEpoch). We can potentially deprecate the > > old > > > > api > > > > >> > > > > > > >> > seek(TopicPartition > > > > >> > > > > > > >> > > partition, long offset) in the future. > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > The alternative approach has similar amount of > > api > > > > >> changes > > > > >> > > as > > > > >> > > > > > yours > > > > >> > > > > > > >> but > > > > >> > > > > > > >> > has > > > > >> > > > > > > >> > > the following benefits. > > > > >> > > > > > > >> > > 1. The api works in a similar way as how offset > > > > >> management > > > > >> > > > works > > > > >> > > > > > now > > > > >> > > > > > > >> and > > > > >> > > > > > > >> > is > > > > >> > > > > > > >> > > probably what we want in the long term. > > > > >> > > > > > > >> > > 2. It can reset offsets better when there is > data > > > > loss > > > > >> due > > > > >> > > to > > > > >> > > > > > > unclean > > > > >> > > > > > > >> > > leader election or correlated replica failure. > > > > >> > > > > > > >> > > 3. It can reset offsets better when topic is > > > > recreated. > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > Thanks, > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > Jun > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin < > > > > >> > > lindon...@gmail.com > > > > >> > > > > > > > > >> > > > > > > wrote: > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > Hey Jun, > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't want an > ever > > > > >> growing > > > > >> > > > global > > > > >> > > > > > > >> metadata > > > > >> > > > > > > >> > > > version. I just think it may be more > desirable > > to > > > > >> keep > > > > >> > the > > > > >> > > > > > > consumer > > > > >> > > > > > > >> API > > > > >> > > > > > > >> > > > simple. > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > In my current proposal, metadata version > > returned > > > > in > > > > >> the > > > > >> > > > fetch > > > > >> > > > > > > >> response > > > > >> > > > > > > >> > > > will be stored with the offset together. More > > > > >> > > specifically, > > > > >> > > > > the > > > > >> > > > > > > >> > > > metadata_epoch in the new offset topic schema > > > will > > > > be > > > > >> > the > > > > >> > > > > > largest > > > > >> > > > > > > >> > > > metadata_epoch from all the MetadataResponse > > and > > > > >> > > > FetchResponse > > > > >> > > > > > > ever > > > > >> > > > > > > >> > > > received by this consumer. > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > We probably don't have to change the consumer > > API > > > > for > > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition, > > > OffsetAndMetadata>). > > > > >> If > > > > >> > > user > > > > >> > > > > > calls > > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a > given > > > > >> > partition, > > > > >> > > > for > > > > >> > > > > > > most > > > > >> > > > > > > >> > > > use-cases, this consumer instance should have > > > > >> consumed > > > > >> > > > message > > > > >> > > > > > > with > > > > >> > > > > > > >> > > offset > > > > >> > > > > > > >> > > > 9 from this partition, in which case the > > consumer > > > > can > > > > >> > > > remember > > > > >> > > > > > and > > > > >> > > > > > > >> use > > > > >> > > > > > > >> > > the > > > > >> > > > > > > >> > > > metadata_epoch from the corresponding > > > FetchResponse > > > > >> when > > > > >> > > > > > > committing > > > > >> > > > > > > >> > > offset. > > > > >> > > > > > > >> > > > If user calls commitSync(..) to commit offset > > 10 > > > > for > > > > >> a > > > > >> > > given > > > > >> > > > > > > >> partition > > > > >> > > > > > > >> > > > without having consumed the message with > > offset 9 > > > > >> using > > > > >> > > this > > > > >> > > > > > > >> consumer > > > > >> > > > > > > >> > > > instance, this is probably an advanced > > use-case. > > > In > > > > >> this > > > > >> > > > case > > > > >> > > > > > the > > > > >> > > > > > > >> > > advanced > > > > >> > > > > > > >> > > > user can retrieve the metadata_epoch using > the > > > > newly > > > > >> > added > > > > >> > > > > > > >> > > metadataEpoch() > > > > >> > > > > > > >> > > > API after it fetches the message with offset > 9 > > > > >> (probably > > > > >> > > > from > > > > >> > > > > > > >> another > > > > >> > > > > > > >> > > > consumer instance) and encode this > > metadata_epoch > > > > in > > > > >> the > > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you > think > > > > this > > > > >> > > > solution > > > > >> > > > > > > would > > > > >> > > > > > > >> > work? > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > By "not sure that I fully understand your > > latest > > > > >> > > > suggestion", > > > > >> > > > > > are > > > > >> > > > > > > >> you > > > > >> > > > > > > >> > > > referring to solution related to unclean > leader > > > > >> election > > > > >> > > > using > > > > >> > > > > > > >> > > leader_epoch > > > > >> > > > > > > >> > > > in my previous email? > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > Thanks, > > > > >> > > > > > > >> > > > Dong > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao < > > > > >> > j...@confluent.io > > > > >> > > > > > > > >> > > > > > wrote: > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > Hi, Dong, > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > Not sure that I fully understand your > latest > > > > >> > suggestion. > > > > >> > > > > > > >> Returning an > > > > >> > > > > > > >> > > > ever > > > > >> > > > > > > >> > > > > growing global metadata version itself is > no > > > > ideal, > > > > >> > but > > > > >> > > is > > > > >> > > > > > fine. > > > > >> > > > > > > >> My > > > > >> > > > > > > >> > > > > question is whether the metadata version > > > returned > > > > >> in > > > > >> > the > > > > >> > > > > fetch > > > > >> > > > > > > >> > response > > > > >> > > > > > > >> > > > > needs to be stored with the offset together > > if > > > > >> offsets > > > > >> > > are > > > > >> > > > > > > stored > > > > >> > > > > > > >> > > > > externally. If so, we also have to change > the > > > > >> consumer > > > > >> > > API > > > > >> > > > > for > > > > >> > > > > > > >> > > > commitSync() > > > > >> > > > > > > >> > > > > and need to worry about compatibility. If > we > > > > don't > > > > >> > store > > > > >> > > > the > > > > >> > > > > > > >> metadata > > > > >> > > > > > > >> > > > > version together with the offset, on a > > consumer > > > > >> > restart, > > > > >> > > > > it's > > > > >> > > > > > > not > > > > >> > > > > > > >> > clear > > > > >> > > > > > > >> > > > how > > > > >> > > > > > > >> > > > > we can ensure the metadata in the consumer > is > > > > high > > > > >> > > enough > > > > >> > > > > > since > > > > >> > > > > > > >> there > > > > >> > > > > > > >> > > is > > > > >> > > > > > > >> > > > no > > > > >> > > > > > > >> > > > > metadata version to compare with. > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > Thanks, > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > Jun > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin < > > > > >> > > > > lindon...@gmail.com > > > > >> > > > > > > > > > > >> > > > > > > >> > wrote: > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > Hey Jun, > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > Thanks much for the explanation. > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > I understand the advantage of > > partition_epoch > > > > >> over > > > > >> > > > > > > >> metadata_epoch. > > > > >> > > > > > > >> > My > > > > >> > > > > > > >> > > > > > current concern is that the use of > > > leader_epoch > > > > >> and > > > > >> > > the > > > > >> > > > > > > >> > > partition_epoch > > > > >> > > > > > > >> > > > > > requires us considerable change to > > consumer's > > > > >> public > > > > >> > > API > > > > >> > > > > to > > > > >> > > > > > > take > > > > >> > > > > > > >> > care > > > > >> > > > > > > >> > > > of > > > > >> > > > > > > >> > > > > > the case where user stores offset > > externally. > > > > For > > > > >> > > > example, > > > > >> > > > > > > >> > > *consumer*. > > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to take a map > > > whose > > > > >> > value > > > > >> > > is > > > > >> > > > > > > >> <offset, > > > > >> > > > > > > >> > > > > metadata, > > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>. > > > > >> > *consumer*.*seek*(...) > > > > >> > > > > would > > > > >> > > > > > > >> also > > > > >> > > > > > > >> > > need > > > > >> > > > > > > >> > > > > > leader_epoch and partition_epoch as > > > parameter. > > > > >> > > > Technically > > > > >> > > > > > we > > > > >> > > > > > > >> can > > > > >> > > > > > > >> > > > > probably > > > > >> > > > > > > >> > > > > > still make it work in a backward > compatible > > > > >> manner > > > > >> > > after > > > > >> > > > > > > careful > > > > >> > > > > > > >> > > design > > > > >> > > > > > > >> > > > > and > > > > >> > > > > > > >> > > > > > discussion. But these changes can make > the > > > > >> > consumer's > > > > >> > > > > > > interface > > > > >> > > > > > > >> > > > > > unnecessarily complex for more users who > do > > > not > > > > >> > store > > > > >> > > > > offset > > > > >> > > > > > > >> > > > externally. > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > After thinking more about it, we can > > address > > > > all > > > > >> > > > problems > > > > >> > > > > > > >> discussed > > > > >> > > > > > > >> > > by > > > > >> > > > > > > >> > > > > only > > > > >> > > > > > > >> > > > > > using the metadata_epoch without > > introducing > > > > >> > > > leader_epoch > > > > >> > > > > or > > > > >> > > > > > > the > > > > >> > > > > > > >> > > > > > partition_epoch. The current KIP > describes > > > the > > > > >> > changes > > > > >> > > > to > > > > >> > > > > > the > > > > >> > > > > > > >> > > consumer > > > > >> > > > > > > >> > > > > API > > > > >> > > > > > > >> > > > > > and how the new API can be used if user > > > stores > > > > >> > offset > > > > >> > > > > > > >> externally. > > > > >> > > > > > > >> > In > > > > >> > > > > > > >> > > > > order > > > > >> > > > > > > >> > > > > > to address the scenario you described > > > earlier, > > > > we > > > > >> > can > > > > >> > > > > > include > > > > >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and > the > > > > >> > > > > > > LeaderAndIsrRequest. > > > > >> > > > > > > >> > > > Consumer > > > > >> > > > > > > >> > > > > > remembers the largest metadata_epoch from > > all > > > > the > > > > >> > > > > > > FetchResponse > > > > >> > > > > > > >> it > > > > >> > > > > > > >> > > has > > > > >> > > > > > > >> > > > > > received. The metadata_epoch committed > with > > > the > > > > >> > > offset, > > > > >> > > > > > either > > > > >> > > > > > > >> > within > > > > >> > > > > > > >> > > > or > > > > >> > > > > > > >> > > > > > outside Kafka, should be the largest > > > > >> metadata_epoch > > > > >> > > > across > > > > >> > > > > > all > > > > >> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever > > > > received > > > > >> by > > > > >> > > this > > > > >> > > > > > > >> consumer. > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > The drawback of using only the > > metadata_epoch > > > > is > > > > >> > that > > > > >> > > we > > > > >> > > > > can > > > > >> > > > > > > not > > > > >> > > > > > > >> > > always > > > > >> > > > > > > >> > > > > do > > > > >> > > > > > > >> > > > > > the smart offset reset in case of unclean > > > > leader > > > > >> > > > election > > > > >> > > > > > > which > > > > >> > > > > > > >> you > > > > >> > > > > > > >> > > > > > mentioned earlier. But in most case, > > unclean > > > > >> leader > > > > >> > > > > election > > > > >> > > > > > > >> > probably > > > > >> > > > > > > >> > > > > > happens when consumer is not > > > > >> rebalancing/restarting. > > > > >> > > In > > > > >> > > > > > these > > > > >> > > > > > > >> > cases, > > > > >> > > > > > > >> > > > > either > > > > >> > > > > > > >> > > > > > consumer is not directly affected by > > unclean > > > > >> leader > > > > >> > > > > election > > > > >> > > > > > > >> since > > > > >> > > > > > > >> > it > > > > >> > > > > > > >> > > > is > > > > >> > > > > > > >> > > > > > not consuming from the end of the log, or > > > > >> consumer > > > > >> > can > > > > >> > > > > > derive > > > > >> > > > > > > >> the > > > > >> > > > > > > >> > > > > > leader_epoch from the most recent message > > > > >> received > > > > >> > > > before > > > > >> > > > > it > > > > >> > > > > > > >> sees > > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not > sure > > > it > > > > is > > > > >> > > worth > > > > >> > > > > > adding > > > > >> > > > > > > >> the > > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to address > the > > > > >> > remaining > > > > >> > > > > corner > > > > >> > > > > > > >> case. > > > > >> > > > > > > >> > > What > > > > >> > > > > > > >> > > > > do > > > > >> > > > > > > >> > > > > > you think? > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > Thanks, > > > > >> > > > > > > >> > > > > > Dong > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao < > > > > >> > > > j...@confluent.io > > > > >> > > > > > > > > > >> > > > > > > >> wrote: > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > Hi, Dong, > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > Thanks for the reply. > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > To solve the topic recreation issue, we > > > could > > > > >> use > > > > >> > > > > either a > > > > >> > > > > > > >> global > > > > >> > > > > > > >> > > > > > metadata > > > > >> > > > > > > >> > > > > > > version or a partition level epoch. But > > > > either > > > > >> one > > > > >> > > > will > > > > >> > > > > > be a > > > > >> > > > > > > >> new > > > > >> > > > > > > >> > > > > concept, > > > > >> > > > > > > >> > > > > > > right? To me, the latter seems more > > > natural. > > > > It > > > > >> > also > > > > >> > > > > makes > > > > >> > > > > > > it > > > > >> > > > > > > >> > > easier > > > > >> > > > > > > >> > > > to > > > > >> > > > > > > >> > > > > > > detect if a consumer's offset is still > > > valid > > > > >> > after a > > > > >> > > > > topic > > > > >> > > > > > > is > > > > >> > > > > > > >> > > > > recreated. > > > > >> > > > > > > >> > > > > > As > > > > >> > > > > > > >> > > > > > > you pointed out, we don't need to store > > the > > > > >> > > partition > > > > >> > > > > > epoch > > > > >> > > > > > > in > > > > >> > > > > > > >> > the > > > > >> > > > > > > >> > > > > > message. > > > > >> > > > > > > >> > > > > > > The following is what I am thinking. > > When a > > > > >> > > partition > > > > >> > > > is > > > > >> > > > > > > >> created, > > > > >> > > > > > > >> > > we > > > > >> > > > > > > >> > > > > can > > > > >> > > > > > > >> > > > > > > assign a partition epoch from an > > > > >> ever-increasing > > > > >> > > > global > > > > >> > > > > > > >> counter > > > > >> > > > > > > >> > and > > > > >> > > > > > > >> > > > > store > > > > >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/ > > > > >> > > > partitions/[partitionId] > > > > >> > > > > in > > > > >> > > > > > > ZK. > > > > >> > > > > > > >> > The > > > > >> > > > > > > >> > > > > > > partition > > > > >> > > > > > > >> > > > > > > epoch is propagated to every broker. > The > > > > >> consumer > > > > >> > > will > > > > >> > > > > be > > > > >> > > > > > > >> > tracking > > > > >> > > > > > > >> > > a > > > > >> > > > > > > >> > > > > > tuple > > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, partition > > epoch> > > > > for > > > > >> > > > offsets. > > > > >> > > > > > If a > > > > >> > > > > > > >> > topic > > > > >> > > > > > > >> > > is > > > > >> > > > > > > >> > > > > > > recreated, it's possible that a > > consumer's > > > > >> offset > > > > >> > > and > > > > >> > > > > > leader > > > > >> > > > > > > >> > epoch > > > > >> > > > > > > >> > > > > still > > > > >> > > > > > > >> > > > > > > match that in the broker, but partition > > > epoch > > > > >> > won't > > > > >> > > > be. > > > > >> > > > > In > > > > >> > > > > > > >> this > > > > >> > > > > > > >> > > case, > > > > >> > > > > > > >> > > > > we > > > > >> > > > > > > >> > > > > > > can potentially still treat the > > consumer's > > > > >> offset > > > > >> > as > > > > >> > > > out > > > > >> > > > > > of > > > > >> > > > > > > >> range > > > > >> > > > > > > >> > > and > > > > >> > > > > > > >> > > > > > reset > > > > >> > > > > > > >> > > > > > > the offset based on the offset reset > > policy > > > > in > > > > >> the > > > > >> > > > > > consumer. > > > > >> > > > > > > >> This > > > > >> > > > > > > >> > > > seems > > > > >> > > > > > > >> > > > > > > harder to do with a global metadata > > > version. > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > Jun > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong > > Lin < > > > > >> > > > > > > >> lindon...@gmail.com> > > > > >> > > > > > > >> > > > wrote: > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > Hey Jun, > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > This is a very good example. After > > > thinking > > > > >> > > through > > > > >> > > > > this > > > > >> > > > > > > in > > > > >> > > > > > > >> > > > detail, I > > > > >> > > > > > > >> > > > > > > agree > > > > >> > > > > > > >> > > > > > > > that we need to commit offset with > > leader > > > > >> epoch > > > > >> > in > > > > >> > > > > order > > > > >> > > > > > > to > > > > >> > > > > > > >> > > address > > > > >> > > > > > > >> > > > > > this > > > > >> > > > > > > >> > > > > > > > example. > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > I think the remaining question is how > > to > > > > >> address > > > > >> > > the > > > > >> > > > > > > >> scenario > > > > >> > > > > > > >> > > that > > > > >> > > > > > > >> > > > > the > > > > >> > > > > > > >> > > > > > > > topic is deleted and re-created. One > > > > possible > > > > >> > > > solution > > > > >> > > > > > is > > > > >> > > > > > > to > > > > >> > > > > > > >> > > commit > > > > >> > > > > > > >> > > > > > > offset > > > > >> > > > > > > >> > > > > > > > with both the leader epoch and the > > > metadata > > > > >> > > version. > > > > >> > > > > The > > > > >> > > > > > > >> logic > > > > >> > > > > > > >> > > and > > > > >> > > > > > > >> > > > > the > > > > >> > > > > > > >> > > > > > > > implementation of this solution does > > not > > > > >> > require a > > > > >> > > > new > > > > >> > > > > > > >> concept > > > > >> > > > > > > >> > > > (e.g. > > > > >> > > > > > > >> > > > > > > > partition epoch) and it does not > > require > > > > any > > > > >> > > change > > > > >> > > > to > > > > >> > > > > > the > > > > >> > > > > > > >> > > message > > > > >> > > > > > > >> > > > > > format > > > > >> > > > > > > >> > > > > > > > or leader epoch. It also allows us to > > > order > > > > >> the > > > > >> > > > > metadata > > > > >> > > > > > > in > > > > >> > > > > > > >> a > > > > >> > > > > > > >> > > > > > > > straightforward manner which may be > > > useful > > > > in > > > > >> > the > > > > >> > > > > > future. > > > > >> > > > > > > >> So it > > > > >> > > > > > > >> > > may > > > > >> > > > > > > >> > > > > be > > > > >> > > > > > > >> > > > > > a > > > > >> > > > > > > >> > > > > > > > better solution than generating a > > random > > > > >> > partition > > > > >> > > > > epoch > > > > >> > > > > > > >> every > > > > >> > > > > > > >> > > time > > > > >> > > > > > > >> > > > > we > > > > >> > > > > > > >> > > > > > > > create a partition. Does this sound > > > > >> reasonable? > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > Previously one concern with using the > > > > >> metadata > > > > >> > > > version > > > > >> > > > > > is > > > > >> > > > > > > >> that > > > > >> > > > > > > >> > > > > consumer > > > > >> > > > > > > >> > > > > > > > will be forced to refresh metadata > even > > > if > > > > >> > > metadata > > > > >> > > > > > > version > > > > >> > > > > > > >> is > > > > >> > > > > > > >> > > > > > increased > > > > >> > > > > > > >> > > > > > > > due to topics that the consumer is > not > > > > >> > interested > > > > >> > > > in. > > > > >> > > > > > Now > > > > >> > > > > > > I > > > > >> > > > > > > >> > > > realized > > > > >> > > > > > > >> > > > > > that > > > > >> > > > > > > >> > > > > > > > this is probably not a problem. > > Currently > > > > >> client > > > > >> > > > will > > > > >> > > > > > > >> refresh > > > > >> > > > > > > >> > > > > metadata > > > > >> > > > > > > >> > > > > > > > either due to > InvalidMetadataException > > in > > > > the > > > > >> > > > response > > > > >> > > > > > > from > > > > >> > > > > > > >> > > broker > > > > >> > > > > > > >> > > > or > > > > >> > > > > > > >> > > > > > due > > > > >> > > > > > > >> > > > > > > > to metadata expiry. The addition of > the > > > > >> metadata > > > > >> > > > > version > > > > >> > > > > > > >> should > > > > >> > > > > > > >> > > > > > increase > > > > >> > > > > > > >> > > > > > > > the overhead of metadata refresh > caused > > > by > > > > >> > > > > > > >> > > > InvalidMetadataException. > > > > >> > > > > > > >> > > > > If > > > > >> > > > > > > >> > > > > > > > client refresh metadata due to expiry > > and > > > > it > > > > >> > > > receives > > > > >> > > > > a > > > > >> > > > > > > >> > metadata > > > > >> > > > > > > >> > > > > whose > > > > >> > > > > > > >> > > > > > > > version is lower than the current > > > metadata > > > > >> > > version, > > > > >> > > > we > > > > >> > > > > > can > > > > >> > > > > > > >> > reject > > > > >> > > > > > > >> > > > the > > > > >> > > > > > > >> > > > > > > > metadata but still reset the metadata > > > age, > > > > >> which > > > > >> > > > > > > essentially > > > > >> > > > > > > >> > keep > > > > >> > > > > > > >> > > > the > > > > >> > > > > > > >> > > > > > > > existing behavior in the client. > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > Thanks much, > > > > >> > > > > > > >> > > > > > > > Dong > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >