Hi, Dong, Thanks for the updated KIP. It looks good to me now. The only thing is for OffsetEpoch. If we expose the individual fields in the class, we probably don't need the encode/decode methods. If we want to hide the details of OffsetEpoch, we probably don't want expose the individual fields.
Jun On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com> wrote: > Thinking about point 61 more, I realize that the async zookeeper read may > make it less of an issue for controller to read more zookeeper nodes. > Writing partition_epoch in the per-partition znode makes it simpler to > handle the broker failure between zookeeper writes for a topic creation. I > have updated the KIP to use the suggested approach. > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Thanks much for the comments. Please see my comments inline. > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> wrote: > > > >> Hi, Dong, > >> > >> Thanks for the updated KIP. Looks good to me overall. Just a few minor > >> comments. > >> > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition partition): > >> It > >> seems that there is no need to return metadata. We probably want to > return > >> sth like OffsetAndEpoch. > >> > > > > Previously I think we may want to re-use the existing class to keep our > > consumer interface simpler. I have updated the KIP to add class > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because user may > confuse > > this name with OffsetEpoch. Does this sound OK? > > > > > >> > >> 61. Should we store partition_epoch in > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK? > >> > > > > I have considered this. I think the advantage of adding the > > partition->partition_epoch map in the existing > > znode /brokers/topics/[topic]/partitions is that controller only needs > to > > read one znode per topic to gets its partition_epoch information. > Otherwise > > controller may need to read one extra znode per partition to get the same > > information. > > > > When we delete partition or expand partition of a topic, someone needs to > > modify partition->partition_epoch map in znode > > /brokers/topics/[topic]/partitions. This may seem a bit more complicated > > than simply adding or deleting znode /brokers/topics/[topic]/ > partitions/[partitionId]. > > But the complexity is probably similar to the existing operation of > > modifying the partition->replica_list mapping in znode > > /brokers/topics/[topic]. So not sure it is better to store the > > partition_epoch in /brokers/topics/[topic]/partitions/[partitionId]. > What > > do you think? > > > > > >> > >> 62. For checking outdated metadata in the client, we probably want to > add > >> when max_partition_epoch will be used. > >> > > > > The max_partition_epoch is used in the Proposed Changes -> Client's > > metadata refresh section to determine whether a metadata is outdated. And > > this formula is referenced and re-used in other sections to determine > > whether a metadata is outdated. Does this formula look OK? > > > > > >> > >> 63. "The leader_epoch should be the largest leader_epoch of messages > whose > >> offset < the commit offset. If no message has been consumed since > consumer > >> initialization, the leader_epoch from seek(...) or OffsetFetchResponse > >> should be used. The partition_epoch should be read from the last > >> FetchResponse corresponding to the given partition and commit offset. ": > >> leader_epoch and partition_epoch are associated with an offset. So, if > no > >> message is consumed, there is no offset and therefore there is no need > to > >> read leader_epoch and partition_epoch. Also, the leader_epoch associated > >> with the offset should just come from the messages returned in the fetch > >> response. > >> > > > > I am thinking that, if user calls seek(..) and commitSync(...) without > > consuming any messages, we should re-use the leader_epoch and > > partition_epoch provided by the seek(...) in the OffsetCommitRequest. And > > if messages have been successfully consumed, then leader_epoch will come > > from the messages returned in the fetch response. The condition "messages > > whose offset < the commit offset" is needed to take care of the log > > compacted topic which may have offset gap due to log cleaning. > > > > Did I miss something here? Or should I rephrase the paragraph to make it > > less confusing? > > > > > >> 64. Could you include the public methods in the OffsetEpoch class? > >> > > > > I mistakenly deleted the definition of OffsetEpoch class from the KIP. I > > just added it back with the public methods. Could you take another look? > > > > > >> > >> Jun > >> > >> > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin <lindon...@gmail.com> wrote: > >> > >> > Hey Jun, > >> > > >> > Thanks much. I agree that we can not rely on committed offsets to be > >> always > >> > deleted when we delete topic. So it is necessary to use a > per-partition > >> > epoch that does not change unless this partition is deleted. I also > >> agree > >> > that it is very nice to be able to uniquely identify a message with > >> > (offset, leader_epoch, partition_epoch) in face of potential topic > >> deletion > >> > and unclean leader election. > >> > > >> > I agree with all your comments. And I have updated the KIP based on > our > >> > latest discussion. In addition, I added InvalidPartitionEpochException > >> > which will be thrown by consumer.poll() if the partition_epoch > >> associated > >> > with the partition, which can be given to consumer using seek(...), is > >> > different from the partition_epoch in the FetchResponse. > >> > > >> > Can you take another look at the latest KIP? > >> > > >> > Thanks! > >> > Dong > >> > > >> > > >> > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao <j...@confluent.io> wrote: > >> > > >> > > Hi, Dong, > >> > > > >> > > My replies are the following. > >> > > > >> > > 60. What you described could also work. The drawback is that we will > >> be > >> > > unnecessarily changing the partition epoch when a partition hasn't > >> really > >> > > changed. I was imagining that the partition epoch will be stored in > >> > > /brokers/topics/[topic]/partitions/[partitionId], instead of at the > >> > topic > >> > > level. So, not sure if ZK size limit is an issue. > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a unique > identifier > >> > for > >> > > a message. So, if a message hasn't changed, the offset and the > >> associated > >> > > offset_epoch ideally should remain the same (it will be kind of > weird > >> if > >> > > two consumer apps save the offset on the same message, but the > >> > offset_epoch > >> > > are different). partition_epoch + leader_epoch give us that. > >> > global_epoch + > >> > > leader_epoch don't. If we use this approach, we can solve not only > the > >> > > problem that you have identified, but also other problems when there > >> is > >> > > data loss or topic re-creation more reliably. For example, in the > >> future, > >> > > if we include the partition_epoch and leader_epoch in the fetch > >> request, > >> > > the server can do a more reliable check of whether that offset is > >> valid > >> > or > >> > > not. I am not sure that we can rely upon all external offsets to be > >> > removed > >> > > on topic deletion. For example, a topic may be deleted by an admin > who > >> > may > >> > > not know all the applications. > >> > > > >> > > If we agree on the above, the second question is then how to > reliably > >> > > propagate the partition_epoch and the leader_epoch to the consumer > >> when > >> > > there are leader or partition changes. The leader_epoch comes from > the > >> > > message, which is reliable. So, I was suggesting that when we store > an > >> > > offset, we can just store the leader_epoch from the message set > >> > containing > >> > > that offset. Similarly, I was thinking that if the partition_epoch > is > >> in > >> > > the fetch response, we can propagate partition_epoch reliably where > is > >> > > partition_epoch change. > >> > > > >> > > 63. My point is that once a leader is producing a message in the new > >> > > partition_epoch, ideally, we should associate the new offsets with > the > >> > new > >> > > partition_epoch. Otherwise, the offset_epoch won't be the correct > >> unique > >> > > identifier (useful for solving other problems mentioned above). I > was > >> > > originally thinking that the leader will include the partition_epoch > >> in > >> > the > >> > > metadata cache in the fetch response. It's just that right now, > >> metadata > >> > > cache is updated on UpdateMetadataRequest, which typically happens > >> after > >> > > the LeaderAndIsrRequest. Another approach is for the leader to cache > >> the > >> > > partition_epoch in the Partition object and return that (instead of > >> the > >> > one > >> > > in metadata cache) in the fetch response. > >> > > > >> > > 65. It seems to me that the global_epoch and the partition_epoch > have > >> > > different purposes. A partition_epoch has the benefit that it (1) > can > >> be > >> > > used to form a unique identifier for a message and (2) can be used > to > >> > > solve other > >> > > corner case problems in the future. I am not sure having just a > >> > > global_epoch can achieve these. global_epoch is useful to determine > >> which > >> > > version of the metadata is newer, especially with topic deletion. > >> > > > >> > > Thanks, > >> > > > >> > > Jun > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > >> > > > Regarding the use of the global epoch in 65), it is very similar > to > >> the > >> > > > proposal of the metadata_epoch we discussed earlier. The main > >> > difference > >> > > is > >> > > > that this epoch is incremented when we create/expand/delete topic > >> and > >> > > does > >> > > > not change when controller re-send metadata. > >> > > > > >> > > > I looked at our previous discussion. It seems that we prefer > >> > > > partition_epoch over the metadata_epoch because 1) we prefer not > to > >> > have > >> > > an > >> > > > ever growing metadata_epoch and 2) we can reset offset better when > >> > topic > >> > > is > >> > > > re-created. The use of global topic_epoch avoids the drawback of > an > >> > ever > >> > > > quickly ever growing metadata_epoch. Though the global epoch does > >> not > >> > > allow > >> > > > us to recognize the invalid offset committed before the topic > >> > > re-creation, > >> > > > we can probably just delete the offset when we delete a topic. > Thus > >> I > >> > am > >> > > > not very sure whether it is still worthwhile to have a > per-partition > >> > > > partition_epoch if the metadata already has the global epoch. > >> > > > > >> > > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com> > >> wrote: > >> > > > > >> > > > > Hey Jun, > >> > > > > > >> > > > > Thanks so much. These comments very useful. Please see below my > >> > > comments. > >> > > > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> > wrote: > >> > > > > > >> > > > >> Hi, Dong, > >> > > > >> > >> > > > >> Thanks for the updated KIP. A few more comments. > >> > > > >> > >> > > > >> 60. Perhaps having a partition epoch is more flexible since in > >> the > >> > > > future, > >> > > > >> we may support deleting a partition as well. > >> > > > >> > >> > > > > > >> > > > > Yeah I have considered this. I think we can probably still > support > >> > > > > deleting a partition by using the topic_epoch -- when partition > >> of a > >> > > > topic > >> > > > > is deleted or created, epoch of all partitions of this topic > will > >> be > >> > > > > incremented by 1. Therefore, if that partition is re-created > >> later, > >> > the > >> > > > > epoch of that partition will still be larger than its epoch > before > >> > the > >> > > > > deletion, which still allows the client to order the metadata > for > >> the > >> > > > > purpose of this KIP. Does this sound reasonable? > >> > > > > > >> > > > > The advantage of using topic_epoch instead of partition_epoch is > >> that > >> > > the > >> > > > > size of the /brokers/topics/[topic] znode and request/response > >> size > >> > can > >> > > > be > >> > > > > smaller. We have a limit on the maximum size of znode (typically > >> > 1MB). > >> > > > Use > >> > > > > partition epoch can effectively reduce the number of partitions > >> that > >> > > can > >> > > > be > >> > > > > described by the /brokers/topics/[topic] znode. > >> > > > > > >> > > > > One use-case of partition_epoch for client to detect that the > >> > committed > >> > > > > offset, either from kafka offset topic or from the external > store > >> is > >> > > > > invalid after partition deletion and re-creation. However, it > >> seems > >> > > that > >> > > > we > >> > > > > can also address this use-case with other approaches. For > example, > >> > when > >> > > > > AdminClient deletes partitions, it can also delete the committed > >> > > offsets > >> > > > > for those partitions from the offset topic. If user stores > offset > >> > > > > externally, it might make sense for user to similarly remove > >> offsets > >> > of > >> > > > > related partitions after these partitions are deleted. So I am > not > >> > sure > >> > > > > that we should use partition_epoch in this KIP. > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 61. It seems that the leader epoch returned in the position() > >> call > >> > > > should > >> > > > >> the the leader epoch returned in the fetch response, not the > one > >> in > >> > > the > >> > > > >> metadata cache of the client. > >> > > > > > >> > > > > > >> > > > > I think this is a good idea. Just to double check, this change > >> does > >> > not > >> > > > > affect the correctness or performance of this KIP. But it can be > >> > useful > >> > > > if > >> > > > > we want to use the leader_epoch to better handle the offset rest > >> in > >> > > case > >> > > > of > >> > > > > unclean leader election, which is listed in the future work. Is > >> this > >> > > > > understanding correct? > >> > > > > > >> > > > > I have updated the KIP to specify that the leader_epoch returned > >> by > >> > > > > position() should be the largest leader_epoch of those already > >> > consumed > >> > > > > messages whose offset < position. If no message has been > consumed > >> > since > >> > > > > consumer initialization, the leader_epoch from seek() or > >> > > > > OffsetFetchResponse should be used. The offset included in the > >> > > > > OffsetCommitRequest will also be determined in the similar > manner. > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 62. I am wondering if we should return the partition epoch in > the > >> > > fetch > >> > > > >> response as well. In the current proposal, if a topic is > >> recreated > >> > and > >> > > > the > >> > > > >> new leader is on the same broker as the old one, there is > >> nothing to > >> > > > force > >> > > > >> the metadata refresh in the client. So, the client may still > >> > associate > >> > > > the > >> > > > >> offset with the old partition epoch. > >> > > > >> > >> > > > > > >> > > > > Could you help me understand the problem if a client associates > >> old > >> > > > > partition_epoch (or the topic_epoch as of the current KIP) with > >> the > >> > > > offset? > >> > > > > The main purpose of the topic_epoch is to be able to drop > >> > leader_epoch > >> > > > to 0 > >> > > > > after a partition is deleted and re-created. I guess you may be > >> > > thinking > >> > > > > about using the partition_epoch to detect that the committed > >> offset > >> > is > >> > > > > invalid? In that case, I am wondering if the alternative > approach > >> > > > described > >> > > > > in 60) would be reasonable. > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 63. There is some subtle coordination between the > >> > LeaderAndIsrRequest > >> > > > and > >> > > > >> UpdateMetadataRequest. Currently, when a leader changes, the > >> > > controller > >> > > > >> first sends the LeaderAndIsrRequest to the assigned replicas > and > >> the > >> > > > >> UpdateMetadataRequest to every broker. So, there could be a > small > >> > > window > >> > > > >> when the leader already receives the new partition epoch in the > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the broker > hasn't > >> > been > >> > > > >> updated with the latest partition epoch. Not sure what's the > best > >> > way > >> > > to > >> > > > >> address this issue. Perhaps we can update the metadata cache on > >> the > >> > > > broker > >> > > > >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The > >> > challenge > >> > > > is > >> > > > >> that the two have slightly different data. For example, only > the > >> > > latter > >> > > > >> has > >> > > > >> all endpoints. > >> > > > >> > >> > > > > > >> > > > > I am not sure whether this is a problem. Could you explain a bit > >> more > >> > > > what > >> > > > > specific problem this small window can cause? > >> > > > > > >> > > > > Since client can fetch metadata from any broker in the cluster, > >> and > >> > > given > >> > > > > that different brokers receive request (e.g. LeaderAndIsrRequest > >> and > >> > > > > UpdateMetadataRequest) in arbitrary order, the metadata received > >> by > >> > > > client > >> > > > > can be in arbitrary order (either newer or older) compared to > the > >> > > > broker's > >> > > > > leadership state even if a given broker receives > >> LeaderAndIsrRequest > >> > > and > >> > > > > UpdateMetadataRequest simultaneously. So I am not sure it is > >> useful > >> > to > >> > > > > update broker's cache with LeaderAndIsrRequest. > >> > > > > > >> > > > > > >> > > > >> 64. The enforcement of leader epoch in Offset commit: We allow > a > >> > > > consumer > >> > > > >> to set an arbitrary offset. So it's possible for offsets or > >> leader > >> > > epoch > >> > > > >> to > >> > > > >> go backwards. I am not sure if we could always enforce that the > >> > leader > >> > > > >> epoch only goes up on the broker. > >> > > > >> > >> > > > > > >> > > > > Sure. I have removed this check from the KIP. > >> > > > > > >> > > > > BTW, we can probably still ensure that the leader_epoch always > >> > increase > >> > > > if > >> > > > > the leader_epoch used with offset commit is the max(leader_epoch > >> of > >> > the > >> > > > > message with offset = the committed offset - 1, the largest > known > >> > > > > leader_epoch from the metadata). But I don't have a good > use-case > >> for > >> > > > this > >> > > > > alternative definition. So I choose the keep the KIP simple by > >> > > requiring > >> > > > > leader_epoch to always increase. > >> > > > > > >> > > > > > >> > > > >> 65. Good point on handling missing partition epoch due to topic > >> > > > deletion. > >> > > > >> Another potential way to address this is to additionally > >> propagate > >> > the > >> > > > >> global partition epoch to brokers and the clients. This way, > >> when a > >> > > > >> partition epoch is missing, we can use the global partition > >> epoch to > >> > > > >> reason > >> > > > >> about which metadata is more recent. > >> > > > >> > >> > > > > > >> > > > > This is a great idea. The global epoch can be used to order the > >> > > metadata > >> > > > > and help us recognize the more recent metadata if a topic (or > >> > > partition) > >> > > > is > >> > > > > deleted and re-created. > >> > > > > > >> > > > > Actually, it seems we only need to propagate the global epoch to > >> > > brokers > >> > > > > and clients without propagating this epoch on a per-topic or > >> > > > per-partition > >> > > > > basic. Doing so would simply interface changes made this KIP. > Does > >> > this > >> > > > > approach sound reasonable? > >> > > > > > >> > > > > > >> > > > >> 66. A client may also get an offset by time using the > >> > offsetForTimes() > >> > > > >> api. > >> > > > >> So, we probably want to include offsetInternalMetadata in > >> > > > >> OffsetAndTimestamp > >> > > > >> as well. > >> > > > >> > >> > > > > > >> > > > > You are right. This probably also requires us to change the > >> > > > > ListOffsetRequest as well. I will update the KIP after we agree > on > >> > the > >> > > > > solution for 65). > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> 67. InteralMetadata can be a bit confusing with the metadata > >> field > >> > > > already > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It might be > >> useful > >> > to > >> > > > make > >> > > > >> OffsetEpoch printable at least for debugging purpose. Once you > do > >> > > that, > >> > > > we > >> > > > >> are already exposing the internal fields. So, not sure if it's > >> worth > >> > > > >> hiding > >> > > > >> them. If we do want to hide them, perhaps we can have sth like > >> the > >> > > > >> following. The binary encoding is probably more efficient than > >> JSON > >> > > for > >> > > > >> external storage. > >> > > > >> > >> > > > >> OffsetEpoch { > >> > > > >> static OffsetEpoch decode(byte[]); > >> > > > >> > >> > > > >> public byte[] encode(); > >> > > > >> > >> > > > >> public String toString(); > >> > > > >> } > >> > > > >> > >> > > > > > >> > > > > Thanks much. I like this solution. I have updated the KIP > >> > accordingly. > >> > > > > > >> > > > > > >> > > > > > >> > > > >> > >> > > > >> Jun > >> > > > >> > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> > >> > wrote: > >> > > > >> > >> > > > >> > Hey Jason, > >> > > > >> > > >> > > > >> > Certainly. This sounds good. I have updated the KIP to > clarity > >> > that > >> > > > the > >> > > > >> > global epoch will be incremented by 1 each time a topic is > >> > deleted. > >> > > > >> > > >> > > > >> > Thanks, > >> > > > >> > Dong > >> > > > >> > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson < > >> > ja...@confluent.io > >> > > > > >> > > > >> > wrote: > >> > > > >> > > >> > > > >> > > Hi Dong, > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > I think your approach will allow user to distinguish > between > >> the > >> > > > >> metadata > >> > > > >> > > > before and after the topic deletion. I also agree that > this > >> > can > >> > > be > >> > > > >> > > > potentially be useful to user. I am just not very sure > >> whether > >> > > we > >> > > > >> > already > >> > > > >> > > > have a good use-case to make the additional complexity > >> > > worthwhile. > >> > > > >> It > >> > > > >> > > seems > >> > > > >> > > > that this feature is kind of independent of the main > >> problem > >> > of > >> > > > this > >> > > > >> > KIP. > >> > > > >> > > > Could we add this as a future work? > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic epoch on > deletion > >> > and > >> > > > >> leave > >> > > > >> > > propagation of the epoch for deleted topics for future > work? > >> I > >> > > don't > >> > > > >> > think > >> > > > >> > > this adds much complexity and it makes the behavior > >> consistent: > >> > > > every > >> > > > >> > topic > >> > > > >> > > mutation results in an epoch bump. > >> > > > >> > > > >> > > > >> > > Thanks, > >> > > > >> > > Jason > >> > > > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin < > >> lindon...@gmail.com> > >> > > > wrote: > >> > > > >> > > > >> > > > >> > > > Hey Ismael, > >> > > > >> > > > > >> > > > >> > > > I guess we actually need user to see this field so that > >> user > >> > can > >> > > > >> store > >> > > > >> > > this > >> > > > >> > > > value in the external store together with the offset. We > >> just > >> > > > prefer > >> > > > >> > the > >> > > > >> > > > value to be opaque to discourage most users from > >> interpreting > >> > > this > >> > > > >> > value. > >> > > > >> > > > One more advantage of using such an opaque field is to be > >> able > >> > > to > >> > > > >> > evolve > >> > > > >> > > > the information (or schema) of this value without > changing > >> > > > consumer > >> > > > >> API > >> > > > >> > > in > >> > > > >> > > > the future. > >> > > > >> > > > > >> > > > >> > > > I also thinking it is probably OK for user to be able to > >> > > interpret > >> > > > >> this > >> > > > >> > > > value, particularly for those advanced users. > >> > > > >> > > > > >> > > > >> > > > Thanks, > >> > > > >> > > > Dong > >> > > > >> > > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma < > >> > ism...@juma.me.uk> > >> > > > >> wrote: > >> > > > >> > > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson < > >> > > > >> ja...@confluent.io> > >> > > > >> > > > > wrote: > >> > > > >> > > > > > > >> > > > >> > > > > > class OffsetAndMetadata { > >> > > > >> > > > > > long offset; > >> > > > >> > > > > > byte[] offsetMetadata; > >> > > > >> > > > > > String metadata; > >> > > > >> > > > > > } > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > Admittedly, the naming is a bit annoying, but we can > >> > > probably > >> > > > >> come > >> > > > >> > up > >> > > > >> > > > > with > >> > > > >> > > > > > something better. Internally the byte array would > have > >> a > >> > > > >> version. > >> > > > >> > If > >> > > > >> > > in > >> > > > >> > > > > the > >> > > > >> > > > > > future we have anything else we need to add, we can > >> update > >> > > the > >> > > > >> > > version > >> > > > >> > > > > and > >> > > > >> > > > > > we wouldn't need any new APIs. > >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > We can also add fields to a class in a compatible way. > >> So, > >> > it > >> > > > >> seems > >> > > > >> > to > >> > > > >> > > me > >> > > > >> > > > > that the main advantage of the byte array is that it's > >> > opaque > >> > > to > >> > > > >> the > >> > > > >> > > > user. > >> > > > >> > > > > Is that correct? If so, we could also add any opaque > >> > metadata > >> > > > in a > >> > > > >> > > > subclass > >> > > > >> > > > > so that users don't even see it (unless they cast it, > but > >> > then > >> > > > >> > they're > >> > > > >> > > on > >> > > > >> > > > > their own). > >> > > > >> > > > > > >> > > > >> > > > > Ismael > >> > > > >> > > > > > >> > > > >> > > > > The corresponding seek() and position() APIs might look > >> > > > something > >> > > > >> > like > >> > > > >> > > > > this: > >> > > > >> > > > > > > >> > > > >> > > > > > void seek(TopicPartition partition, long offset, > byte[] > >> > > > >> > > > offsetMetadata); > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition partition); > >> > > > >> > > > > > > >> > > > >> > > > > > What do you think? > >> > > > >> > > > > > > >> > > > >> > > > > > Thanks, > >> > > > >> > > > > > Jason > >> > > > >> > > > > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin < > >> > > lindon...@gmail.com > >> > > > > > >> > > > >> > > wrote: > >> > > > >> > > > > > > >> > > > >> > > > > > > Hey Jun, Jason, > >> > > > >> > > > > > > > >> > > > >> > > > > > > Thanks much for all the feedback. I have updated > the > >> KIP > >> > > > >> based on > >> > > > >> > > the > >> > > > >> > > > > > > latest discussion. Can you help check whether it > >> looks > >> > > good? > >> > > > >> > > > > > > > >> > > > >> > > > > > > Thanks, > >> > > > >> > > > > > > Dong > >> > > > >> > > > > > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin < > >> > > > lindon...@gmail.com > >> > > > >> > > >> > > > >> > > > wrote: > >> > > > >> > > > > > > > >> > > > >> > > > > > > > Hey Jun, > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I am not sure > that > >> > the > >> > > > >> > proposed > >> > > > >> > > > API > >> > > > >> > > > > is > >> > > > >> > > > > > > > sufficient. For users that store offset > >> externally, we > >> > > > >> probably > >> > > > >> > > > need > >> > > > >> > > > > > > extra > >> > > > >> > > > > > > > API to return the leader_epoch and > partition_epoch > >> for > >> > > all > >> > > > >> > > > partitions > >> > > > >> > > > > > > that > >> > > > >> > > > > > > > consumers are consuming. I suppose these users > >> > currently > >> > > > use > >> > > > >> > > > > position() > >> > > > >> > > > > > > to > >> > > > >> > > > > > > > get the offset. Thus we probably need a new > method > >> > > > >> > > > > > positionWithEpoch(..) > >> > > > >> > > > > > > to > >> > > > >> > > > > > > > return <offset, partition_epoch, leader_epoch>. > >> Does > >> > > this > >> > > > >> sound > >> > > > >> > > > > > > reasonable? > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > Thanks, > >> > > > >> > > > > > > > Dong > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao < > >> > > j...@confluent.io > >> > > > > > >> > > > >> > > wrote: > >> > > > >> > > > > > > > > >> > > > >> > > > > > > >> Hi, Dong, > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will > >> be > >> > > > >> composed > >> > > > >> > of > >> > > > >> > > > > > > >> (partition_epoch, > >> > > > >> > > > > > > >> leader_epoch). > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> Thanks, > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> Jun > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin < > >> > > > >> lindon...@gmail.com > >> > > > >> > > > >> > > > >> > > > > wrote: > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > >> > Hey Jun, > >> > > > >> > > > > > > >> > > >> > > > >> > > > > > > >> > Thanks much. I like the the new API that you > >> > > proposed. > >> > > > I > >> > > > >> am > >> > > > >> > > not > >> > > > >> > > > > sure > >> > > > >> > > > > > > >> what > >> > > > >> > > > > > > >> > you exactly mean by offset_epoch. I suppose > >> that we > >> > > can > >> > > > >> use > >> > > > >> > > the > >> > > > >> > > > > pair > >> > > > >> > > > > > > of > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) as the > >> > offset_epoch, > >> > > > >> right? > >> > > > >> > > > > > > >> > > >> > > > >> > > > > > > >> > Thanks, > >> > > > >> > > > > > > >> > Dong > >> > > > >> > > > > > > >> > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao < > >> > > > >> j...@confluent.io> > >> > > > >> > > > wrote: > >> > > > >> > > > > > > >> > > >> > > > >> > > > > > > >> > > Hi, Dong, > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you proposed works. The > >> > > question > >> > > > >> is > >> > > > >> > > > whether > >> > > > >> > > > > > > >> that's > >> > > > >> > > > > > > >> > the > >> > > > >> > > > > > > >> > > api that we want to have in the long term. > My > >> > > concern > >> > > > >> is > >> > > > >> > > that > >> > > > >> > > > > > while > >> > > > >> > > > > > > >> the > >> > > > >> > > > > > > >> > api > >> > > > >> > > > > > > >> > > change is simple, the new api seems harder > to > >> > > explain > >> > > > >> and > >> > > > >> > > use. > >> > > > >> > > > > For > >> > > > >> > > > > > > >> > example, > >> > > > >> > > > > > > >> > > a consumer storing offsets externally now > >> needs > >> > to > >> > > > call > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after calling > seek(). > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to make the > >> following > >> > > > >> > compatible > >> > > > >> > > > api > >> > > > >> > > > > > > >> changes > >> > > > >> > > > > > > >> > in > >> > > > >> > > > > > > >> > > Consumer. > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch field in > >> > > > >> > OffsetAndMetadata. > >> > > > >> > > > (no > >> > > > >> > > > > > need > >> > > > >> > > > > > > >> to > >> > > > >> > > > > > > >> > > change the CommitSync() api) > >> > > > >> > > > > > > >> > > * Add a new api seek(TopicPartition > partition, > >> > long > >> > > > >> > offset, > >> > > > >> > > > > > > >> OffsetEpoch > >> > > > >> > > > > > > >> > > offsetEpoch). We can potentially deprecate > the > >> > old > >> > > > api > >> > > > >> > > > > > > >> > seek(TopicPartition > >> > > > >> > > > > > > >> > > partition, long offset) in the future. > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has similar amount > of > >> > api > >> > > > >> changes > >> > > > >> > > as > >> > > > >> > > > > > yours > >> > > > >> > > > > > > >> but > >> > > > >> > > > > > > >> > has > >> > > > >> > > > > > > >> > > the following benefits. > >> > > > >> > > > > > > >> > > 1. The api works in a similar way as how > >> offset > >> > > > >> management > >> > > > >> > > > works > >> > > > >> > > > > > now > >> > > > >> > > > > > > >> and > >> > > > >> > > > > > > >> > is > >> > > > >> > > > > > > >> > > probably what we want in the long term. > >> > > > >> > > > > > > >> > > 2. It can reset offsets better when there is > >> data > >> > > > loss > >> > > > >> due > >> > > > >> > > to > >> > > > >> > > > > > > unclean > >> > > > >> > > > > > > >> > > leader election or correlated replica > failure. > >> > > > >> > > > > > > >> > > 3. It can reset offsets better when topic is > >> > > > recreated. > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > Thanks, > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > Jun > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin < > >> > > > >> > > lindon...@gmail.com > >> > > > >> > > > > > >> > > > >> > > > > > > wrote: > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun, > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't want an > >> ever > >> > > > >> growing > >> > > > >> > > > global > >> > > > >> > > > > > > >> metadata > >> > > > >> > > > > > > >> > > > version. I just think it may be more > >> desirable > >> > to > >> > > > >> keep > >> > > > >> > the > >> > > > >> > > > > > > consumer > >> > > > >> > > > > > > >> API > >> > > > >> > > > > > > >> > > > simple. > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata version > >> > returned > >> > > > in > >> > > > >> the > >> > > > >> > > > fetch > >> > > > >> > > > > > > >> response > >> > > > >> > > > > > > >> > > > will be stored with the offset together. > >> More > >> > > > >> > > specifically, > >> > > > >> > > > > the > >> > > > >> > > > > > > >> > > > metadata_epoch in the new offset topic > >> schema > >> > > will > >> > > > be > >> > > > >> > the > >> > > > >> > > > > > largest > >> > > > >> > > > > > > >> > > > metadata_epoch from all the > MetadataResponse > >> > and > >> > > > >> > > > FetchResponse > >> > > > >> > > > > > > ever > >> > > > >> > > > > > > >> > > > received by this consumer. > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > We probably don't have to change the > >> consumer > >> > API > >> > > > for > >> > > > >> > > > > > > >> > > > commitSync(Map<TopicPartition, > >> > > OffsetAndMetadata>). > >> > > > >> If > >> > > > >> > > user > >> > > > >> > > > > > calls > >> > > > >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a > >> given > >> > > > >> > partition, > >> > > > >> > > > for > >> > > > >> > > > > > > most > >> > > > >> > > > > > > >> > > > use-cases, this consumer instance should > >> have > >> > > > >> consumed > >> > > > >> > > > message > >> > > > >> > > > > > > with > >> > > > >> > > > > > > >> > > offset > >> > > > >> > > > > > > >> > > > 9 from this partition, in which case the > >> > consumer > >> > > > can > >> > > > >> > > > remember > >> > > > >> > > > > > and > >> > > > >> > > > > > > >> use > >> > > > >> > > > > > > >> > > the > >> > > > >> > > > > > > >> > > > metadata_epoch from the corresponding > >> > > FetchResponse > >> > > > >> when > >> > > > >> > > > > > > committing > >> > > > >> > > > > > > >> > > offset. > >> > > > >> > > > > > > >> > > > If user calls commitSync(..) to commit > >> offset > >> > 10 > >> > > > for > >> > > > >> a > >> > > > >> > > given > >> > > > >> > > > > > > >> partition > >> > > > >> > > > > > > >> > > > without having consumed the message with > >> > offset 9 > >> > > > >> using > >> > > > >> > > this > >> > > > >> > > > > > > >> consumer > >> > > > >> > > > > > > >> > > > instance, this is probably an advanced > >> > use-case. > >> > > In > >> > > > >> this > >> > > > >> > > > case > >> > > > >> > > > > > the > >> > > > >> > > > > > > >> > > advanced > >> > > > >> > > > > > > >> > > > user can retrieve the metadata_epoch using > >> the > >> > > > newly > >> > > > >> > added > >> > > > >> > > > > > > >> > > metadataEpoch() > >> > > > >> > > > > > > >> > > > API after it fetches the message with > >> offset 9 > >> > > > >> (probably > >> > > > >> > > > from > >> > > > >> > > > > > > >> another > >> > > > >> > > > > > > >> > > > consumer instance) and encode this > >> > metadata_epoch > >> > > > in > >> > > > >> the > >> > > > >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you > >> think > >> > > > this > >> > > > >> > > > solution > >> > > > >> > > > > > > would > >> > > > >> > > > > > > >> > work? > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully understand your > >> > latest > >> > > > >> > > > suggestion", > >> > > > >> > > > > > are > >> > > > >> > > > > > > >> you > >> > > > >> > > > > > > >> > > > referring to solution related to unclean > >> leader > >> > > > >> election > >> > > > >> > > > using > >> > > > >> > > > > > > >> > > leader_epoch > >> > > > >> > > > > > > >> > > > in my previous email? > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > Thanks, > >> > > > >> > > > > > > >> > > > Dong > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao < > >> > > > >> > j...@confluent.io > >> > > > >> > > > > >> > > > >> > > > > > wrote: > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > > Hi, Dong, > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > Not sure that I fully understand your > >> latest > >> > > > >> > suggestion. > >> > > > >> > > > > > > >> Returning an > >> > > > >> > > > > > > >> > > > ever > >> > > > >> > > > > > > >> > > > > growing global metadata version itself > is > >> no > >> > > > ideal, > >> > > > >> > but > >> > > > >> > > is > >> > > > >> > > > > > fine. > >> > > > >> > > > > > > >> My > >> > > > >> > > > > > > >> > > > > question is whether the metadata version > >> > > returned > >> > > > >> in > >> > > > >> > the > >> > > > >> > > > > fetch > >> > > > >> > > > > > > >> > response > >> > > > >> > > > > > > >> > > > > needs to be stored with the offset > >> together > >> > if > >> > > > >> offsets > >> > > > >> > > are > >> > > > >> > > > > > > stored > >> > > > >> > > > > > > >> > > > > externally. If so, we also have to > change > >> the > >> > > > >> consumer > >> > > > >> > > API > >> > > > >> > > > > for > >> > > > >> > > > > > > >> > > > commitSync() > >> > > > >> > > > > > > >> > > > > and need to worry about compatibility. > If > >> we > >> > > > don't > >> > > > >> > store > >> > > > >> > > > the > >> > > > >> > > > > > > >> metadata > >> > > > >> > > > > > > >> > > > > version together with the offset, on a > >> > consumer > >> > > > >> > restart, > >> > > > >> > > > > it's > >> > > > >> > > > > > > not > >> > > > >> > > > > > > >> > clear > >> > > > >> > > > > > > >> > > > how > >> > > > >> > > > > > > >> > > > > we can ensure the metadata in the > >> consumer is > >> > > > high > >> > > > >> > > enough > >> > > > >> > > > > > since > >> > > > >> > > > > > > >> there > >> > > > >> > > > > > > >> > > is > >> > > > >> > > > > > > >> > > > no > >> > > > >> > > > > > > >> > > > > metadata version to compare with. > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > Thanks, > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > Jun > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong > Lin < > >> > > > >> > > > > lindon...@gmail.com > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > wrote: > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > > Hey Jun, > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > Thanks much for the explanation. > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > I understand the advantage of > >> > partition_epoch > >> > > > >> over > >> > > > >> > > > > > > >> metadata_epoch. > >> > > > >> > > > > > > >> > My > >> > > > >> > > > > > > >> > > > > > current concern is that the use of > >> > > leader_epoch > >> > > > >> and > >> > > > >> > > the > >> > > > >> > > > > > > >> > > partition_epoch > >> > > > >> > > > > > > >> > > > > > requires us considerable change to > >> > consumer's > >> > > > >> public > >> > > > >> > > API > >> > > > >> > > > > to > >> > > > >> > > > > > > take > >> > > > >> > > > > > > >> > care > >> > > > >> > > > > > > >> > > > of > >> > > > >> > > > > > > >> > > > > > the case where user stores offset > >> > externally. > >> > > > For > >> > > > >> > > > example, > >> > > > >> > > > > > > >> > > *consumer*. > >> > > > >> > > > > > > >> > > > > > *commitSync*(..) would have to take a > >> map > >> > > whose > >> > > > >> > value > >> > > > >> > > is > >> > > > >> > > > > > > >> <offset, > >> > > > >> > > > > > > >> > > > > metadata, > >> > > > >> > > > > > > >> > > > > > leader epoch, partition epoch>. > >> > > > >> > *consumer*.*seek*(...) > >> > > > >> > > > > would > >> > > > >> > > > > > > >> also > >> > > > >> > > > > > > >> > > need > >> > > > >> > > > > > > >> > > > > > leader_epoch and partition_epoch as > >> > > parameter. > >> > > > >> > > > Technically > >> > > > >> > > > > > we > >> > > > >> > > > > > > >> can > >> > > > >> > > > > > > >> > > > > probably > >> > > > >> > > > > > > >> > > > > > still make it work in a backward > >> compatible > >> > > > >> manner > >> > > > >> > > after > >> > > > >> > > > > > > careful > >> > > > >> > > > > > > >> > > design > >> > > > >> > > > > > > >> > > > > and > >> > > > >> > > > > > > >> > > > > > discussion. But these changes can make > >> the > >> > > > >> > consumer's > >> > > > >> > > > > > > interface > >> > > > >> > > > > > > >> > > > > > unnecessarily complex for more users > >> who do > >> > > not > >> > > > >> > store > >> > > > >> > > > > offset > >> > > > >> > > > > > > >> > > > externally. > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > After thinking more about it, we can > >> > address > >> > > > all > >> > > > >> > > > problems > >> > > > >> > > > > > > >> discussed > >> > > > >> > > > > > > >> > > by > >> > > > >> > > > > > > >> > > > > only > >> > > > >> > > > > > > >> > > > > > using the metadata_epoch without > >> > introducing > >> > > > >> > > > leader_epoch > >> > > > >> > > > > or > >> > > > >> > > > > > > the > >> > > > >> > > > > > > >> > > > > > partition_epoch. The current KIP > >> describes > >> > > the > >> > > > >> > changes > >> > > > >> > > > to > >> > > > >> > > > > > the > >> > > > >> > > > > > > >> > > consumer > >> > > > >> > > > > > > >> > > > > API > >> > > > >> > > > > > > >> > > > > > and how the new API can be used if > user > >> > > stores > >> > > > >> > offset > >> > > > >> > > > > > > >> externally. > >> > > > >> > > > > > > >> > In > >> > > > >> > > > > > > >> > > > > order > >> > > > >> > > > > > > >> > > > > > to address the scenario you described > >> > > earlier, > >> > > > we > >> > > > >> > can > >> > > > >> > > > > > include > >> > > > >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse > and > >> the > >> > > > >> > > > > > > LeaderAndIsrRequest. > >> > > > >> > > > > > > >> > > > Consumer > >> > > > >> > > > > > > >> > > > > > remembers the largest metadata_epoch > >> from > >> > all > >> > > > the > >> > > > >> > > > > > > FetchResponse > >> > > > >> > > > > > > >> it > >> > > > >> > > > > > > >> > > has > >> > > > >> > > > > > > >> > > > > > received. The metadata_epoch committed > >> with > >> > > the > >> > > > >> > > offset, > >> > > > >> > > > > > either > >> > > > >> > > > > > > >> > within > >> > > > >> > > > > > > >> > > > or > >> > > > >> > > > > > > >> > > > > > outside Kafka, should be the largest > >> > > > >> metadata_epoch > >> > > > >> > > > across > >> > > > >> > > > > > all > >> > > > >> > > > > > > >> > > > > > FetchResponse and MetadataResponse > ever > >> > > > received > >> > > > >> by > >> > > > >> > > this > >> > > > >> > > > > > > >> consumer. > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > The drawback of using only the > >> > metadata_epoch > >> > > > is > >> > > > >> > that > >> > > > >> > > we > >> > > > >> > > > > can > >> > > > >> > > > > > > not > >> > > > >> > > > > > > >> > > always > >> > > > >> > > > > > > >> > > > > do > >> > > > >> > > > > > > >> > > > > > the smart offset reset in case of > >> unclean > >> > > > leader > >> > > > >> > > > election > >> > > > >> > > > > > > which > >> > > > >> > > > > > > >> you > >> > > > >> > > > > > > >> > > > > > mentioned earlier. But in most case, > >> > unclean > >> > > > >> leader > >> > > > >> > > > > election > >> > > > >> > > > > > > >> > probably > >> > > > >> > > > > > > >> > > > > > happens when consumer is not > >> > > > >> rebalancing/restarting. > >> > > > >> > > In > >> > > > >> > > > > > these > >> > > > >> > > > > > > >> > cases, > >> > > > >> > > > > > > >> > > > > either > >> > > > >> > > > > > > >> > > > > > consumer is not directly affected by > >> > unclean > >> > > > >> leader > >> > > > >> > > > > election > >> > > > >> > > > > > > >> since > >> > > > >> > > > > > > >> > it > >> > > > >> > > > > > > >> > > > is > >> > > > >> > > > > > > >> > > > > > not consuming from the end of the log, > >> or > >> > > > >> consumer > >> > > > >> > can > >> > > > >> > > > > > derive > >> > > > >> > > > > > > >> the > >> > > > >> > > > > > > >> > > > > > leader_epoch from the most recent > >> message > >> > > > >> received > >> > > > >> > > > before > >> > > > >> > > > > it > >> > > > >> > > > > > > >> sees > >> > > > >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not > >> sure > >> > > it > >> > > > is > >> > > > >> > > worth > >> > > > >> > > > > > adding > >> > > > >> > > > > > > >> the > >> > > > >> > > > > > > >> > > > > > leader_epoch to consumer API to > address > >> the > >> > > > >> > remaining > >> > > > >> > > > > corner > >> > > > >> > > > > > > >> case. > >> > > > >> > > > > > > >> > > What > >> > > > >> > > > > > > >> > > > > do > >> > > > >> > > > > > > >> > > > > > you think? > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > Thanks, > >> > > > >> > > > > > > >> > > > > > Dong > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun > Rao > >> < > >> > > > >> > > > j...@confluent.io > >> > > > >> > > > > > > >> > > > >> > > > > > > >> wrote: > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > > Hi, Dong, > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > Thanks for the reply. > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > To solve the topic recreation issue, > >> we > >> > > could > >> > > > >> use > >> > > > >> > > > > either a > >> > > > >> > > > > > > >> global > >> > > > >> > > > > > > >> > > > > > metadata > >> > > > >> > > > > > > >> > > > > > > version or a partition level epoch. > >> But > >> > > > either > >> > > > >> one > >> > > > >> > > > will > >> > > > >> > > > > > be a > >> > > > >> > > > > > > >> new > >> > > > >> > > > > > > >> > > > > concept, > >> > > > >> > > > > > > >> > > > > > > right? To me, the latter seems more > >> > > natural. > >> > > > It > >> > > > >> > also > >> > > > >> > > > > makes > >> > > > >> > > > > > > it > >> > > > >> > > > > > > >> > > easier > >> > > > >> > > > > > > >> > > > to > >> > > > >> > > > > > > >> > > > > > > detect if a consumer's offset is > still > >> > > valid > >> > > > >> > after a > >> > > > >> > > > > topic > >> > > > >> > > > > > > is > >> > > > >> > > > > > > >> > > > > recreated. > >> > > > >> > > > > > > >> > > > > > As > >> > > > >> > > > > > > >> > > > > > > you pointed out, we don't need to > >> store > >> > the > >> > > > >> > > partition > >> > > > >> > > > > > epoch > >> > > > >> > > > > > > in > >> > > > >> > > > > > > >> > the > >> > > > >> > > > > > > >> > > > > > message. > >> > > > >> > > > > > > >> > > > > > > The following is what I am thinking. > >> > When a > >> > > > >> > > partition > >> > > > >> > > > is > >> > > > >> > > > > > > >> created, > >> > > > >> > > > > > > >> > > we > >> > > > >> > > > > > > >> > > > > can > >> > > > >> > > > > > > >> > > > > > > assign a partition epoch from an > >> > > > >> ever-increasing > >> > > > >> > > > global > >> > > > >> > > > > > > >> counter > >> > > > >> > > > > > > >> > and > >> > > > >> > > > > > > >> > > > > store > >> > > > >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/ > >> > > > >> > > > partitions/[partitionId] > >> > > > >> > > > > in > >> > > > >> > > > > > > ZK. > >> > > > >> > > > > > > >> > The > >> > > > >> > > > > > > >> > > > > > > partition > >> > > > >> > > > > > > >> > > > > > > epoch is propagated to every broker. > >> The > >> > > > >> consumer > >> > > > >> > > will > >> > > > >> > > > > be > >> > > > >> > > > > > > >> > tracking > >> > > > >> > > > > > > >> > > a > >> > > > >> > > > > > > >> > > > > > tuple > >> > > > >> > > > > > > >> > > > > > > of <offset, leader epoch, partition > >> > epoch> > >> > > > for > >> > > > >> > > > offsets. > >> > > > >> > > > > > If a > >> > > > >> > > > > > > >> > topic > >> > > > >> > > > > > > >> > > is > >> > > > >> > > > > > > >> > > > > > > recreated, it's possible that a > >> > consumer's > >> > > > >> offset > >> > > > >> > > and > >> > > > >> > > > > > leader > >> > > > >> > > > > > > >> > epoch > >> > > > >> > > > > > > >> > > > > still > >> > > > >> > > > > > > >> > > > > > > match that in the broker, but > >> partition > >> > > epoch > >> > > > >> > won't > >> > > > >> > > > be. > >> > > > >> > > > > In > >> > > > >> > > > > > > >> this > >> > > > >> > > > > > > >> > > case, > >> > > > >> > > > > > > >> > > > > we > >> > > > >> > > > > > > >> > > > > > > can potentially still treat the > >> > consumer's > >> > > > >> offset > >> > > > >> > as > >> > > > >> > > > out > >> > > > >> > > > > > of > >> > > > >> > > > > > > >> range > >> > > > >> > > > > > > >> > > and > >> > > > >> > > > > > > >> > > > > > reset > >> > > > >> > > > > > > >> > > > > > > the offset based on the offset reset > >> > policy > >> > > > in > >> > > > >> the > >> > > > >> > > > > > consumer. > >> > > > >> > > > > > > >> This > >> > > > >> > > > > > > >> > > > seems > >> > > > >> > > > > > > >> > > > > > > harder to do with a global metadata > >> > > version. > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > Jun > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, > Dong > >> > Lin < > >> > > > >> > > > > > > >> lindon...@gmail.com> > >> > > > >> > > > > > > >> > > > wrote: > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Hey Jun, > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > This is a very good example. After > >> > > thinking > >> > > > >> > > through > >> > > > >> > > > > this > >> > > > >> > > > > > > in > >> > > > >> > > > > > > >> > > > detail, I > >> > > > >> > > > > > > >> > > > > > > agree > >> > > > >> > > > > > > >> > > > > > > > that we need to commit offset with > >> > leader > >> > > > >> epoch > >> > > > >> > in > >> > > > >> > > > > order > >> > > > >> > > > > > > to > >> > > > >> > > > > > > >> > > address > >> > > > >> > > > > > > >> > > > > > this > >> > > > >> > > > > > > >> > > > > > > > example. > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > I think the remaining question is > >> how > >> > to > >> > > > >> address > >> > > > >> > > the > >> > > > >> > > > > > > >> scenario > >> > > > >> > > > > > > >> > > that > >> > > > >> > > > > > > >> > > > > the > >> > > > >> > > > > > > >> > > > > > > > topic is deleted and re-created. > One > >> > > > possible > >> > > > >> > > > solution > >> > > > >> > > > > > is > >> > > > >> > > > > > > to > >> > > > >> > > > > > > >> > > commit > >> > > > >> > > > > > > >> > > > > > > offset > >> > > > >> > > > > > > >> > > > > > > > with both the leader epoch and the > >> > > metadata > >> > > > >> > > version. > >> > > > >> > > > > The > >> > > > >> > > > > > > >> logic > >> > > > >> > > > > > > >> > > and > >> > > > >> > > > > > > >> > > > > the > >> > > > >> > > > > > > >> > > > > > > > implementation of this solution > does > >> > not > >> > > > >> > require a > >> > > > >> > > > new > >> > > > >> > > > > > > >> concept > >> > > > >> > > > > > > >> > > > (e.g. > >> > > > >> > > > > > > >> > > > > > > > partition epoch) and it does not > >> > require > >> > > > any > >> > > > >> > > change > >> > > > >> > > > to > >> > > > >> > > > > > the > >> > > > >> > > > > > > >> > > message > >> > > > >> > > > > > > >> > > > > > format > >> > > > >> > > > > > > >> > > > > > > > or leader epoch. It also allows us > >> to > >> > > order > >> > > > >> the > >> > > > >> > > > > metadata > >> > > > >> > > > > > > in > >> > > > >> > > > > > > >> a > >> > > > >> > > > > > > >> > > > > > > > straightforward manner which may > be > >> > > useful > >> > > > in > >> > > > >> > the > >> > > > >> > > > > > future. > >> > > > >> > > > > > > >> So it > >> > > > >> > > > > > > >> > > may > >> > > > >> > > > > > > >> > > > > be > >> > > > >> > > > > > > >> > > > > > a > >> > > > >> > > > > > > >> > > > > > > > better solution than generating a > >> > random > >> > > > >> > partition > >> > > > >> > > > > epoch > >> > > > >> > > > > > > >> every > >> > > > >> > > > > > > >> > > time > >> > > > >> > > > > > > >> > > > > we > >> > > > >> > > > > > > >> > > > > > > > create a partition. Does this > sound > >> > > > >> reasonable? > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Previously one concern with using > >> the > >> > > > >> metadata > >> > > > >> > > > version > >> > > > >> > > > > > is > >> > > > >> > > > > > > >> that > >> > > > >> > > > > > > >> > > > > consumer > >> > > > >> > > > > > > >> > > > > > > > will be forced to refresh metadata > >> even > >> > > if > >> > > > >> > > metadata > >> > > > >> > > > > > > version > >> > > > >> > > > > > > >> is > >> > > > >> > > > > > > >> > > > > > increased > >> > > > >> > > > > > > >> > > > > > > > due to topics that the consumer is > >> not > >> > > > >> > interested > >> > > > >> > > > in. > >> > > > >> > > > > > Now > >> > > > >> > > > > > > I > >> > > > >> > > > > > > >> > > > realized > >> > > > >> > > > > > > >> > > > > > that > >> > > > >> > > > > > > >> > > > > > > > this is probably not a problem. > >> > Currently > >> > > > >> client > >> > > > >> > > > will > >> > > > >> > > > > > > >> refresh > >> > > > >> > > > > > > >> > > > > metadata > >> > > > >> > > > > > > >> > > > > > > > either due to > >> InvalidMetadataException > >> > in > >> > > > the > >> > > > >> > > > response > >> > > > >> > > > > > > from > >> > > > >> > > > > > > >> > > broker > >> > > > >> > > > > > > >> > > > or > >> > > > >> > > > > > > >> > > > > > due > >> > > > >> > > > > > > >> > > > > > > > to metadata expiry. The addition > of > >> the > >> > > > >> metadata > >> > > > >> > > > > version > >> > > > >> > > > > > > >> should > >> > > > >> > > > > > > >> > > > > > increase > >> > > > >> > > > > > > >> > > > > > > > the overhead of metadata refresh > >> caused > >> > > by > >> > > > >> > > > > > > >> > > > InvalidMetadataException. > >> > > > >> > > > > > > >> > > > > If > >> > > > >> > > > > > > >> > > > > > > > client refresh metadata due to > >> expiry > >> > and > >> > > > it > >> > > > >> > > > receives > >> > > > >> > > > > a > >> > > > >> > > > > > > >> > metadata > >> > > > >> > > > > > > >> > > > > whose > >> > > > >> > > > > > > >> > > > > > > > version is lower than the current > >> > > metadata > >> > > > >> > > version, > >> > > > >> > > > we > >> > > > >> > > > > > can > >> > > > >> > > > > > > >> > reject > >> > > > >> > > > > > > >> > > > the > >> > > > >> > > > > > > >> > > > > > > > metadata but still reset the > >> metadata > >> > > age, > >> > > > >> which > >> > > > >> > > > > > > essentially > >> > > > >> > > > > > > >> > keep > >> > > > >> > > > > > > >> > > > the > >> > > > >> > > > > > > >> > > > > > > > existing behavior in the client. > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > Thanks much, > >> > > > >> > > > > > > >> > > > > > > > Dong > >> > > > >> > > > > > > >> > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > > >> > > > >> > > > > > > >> > > > > >> > > > >> > > > > > > >> > > > >> > > > >> > > > > > > >> > > >> > > > >> > > > > > > >> > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >