For record purpose, this KIP is closed as its design has been merged into KIP-320. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation .
On Wed, Jan 31, 2018 at 12:16 AM Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, Jason, > > Thanks for all the comments. Could you see if you can give +1 for the KIP? > I am open to make further improvements for the KIP. > > Thanks, > Dong > > On Tue, Jan 23, 2018 at 3:44 PM, Dong Lin <lindon...@gmail.com> wrote: > >> Hey Jun, Jason, >> >> Thanks much for all the review! I will open the voting thread. >> >> Regards, >> Dong >> >> On Tue, Jan 23, 2018 at 3:37 PM, Jun Rao <j...@confluent.io> wrote: >> >>> Hi, Dong, >>> >>> The current KIP looks good to me. >>> >>> Thanks, >>> >>> Jun >>> >>> On Tue, Jan 23, 2018 at 12:29 PM, Dong Lin <lindon...@gmail.com> wrote: >>> >>> > Hey Jun, >>> > >>> > Do you think the current KIP looks OK? I am wondering if we can open >>> the >>> > voting thread. >>> > >>> > Thanks! >>> > Dong >>> > >>> > On Fri, Jan 19, 2018 at 3:08 PM, Dong Lin <lindon...@gmail.com> wrote: >>> > >>> > > Hey Jun, >>> > > >>> > > I think we can probably have a static method in Util class to decode >>> the >>> > > byte[]. Both KafkaConsumer implementation and the user application >>> will >>> > be >>> > > able to decode the byte array and log its content for debug purpose. >>> So >>> > it >>> > > seems that we can still print the information we want. It is just not >>> > > explicitly exposed in the consumer interface. Would this address the >>> > > problem here? >>> > > >>> > > Yeah we can include OffsetEpoch in AdminClient. This can be added in >>> > > KIP-222? Is there something you would like me to add in this KIP? >>> > > >>> > > Thanks! >>> > > Dong >>> > > >>> > > On Fri, Jan 19, 2018 at 3:00 PM, Jun Rao <j...@confluent.io> wrote: >>> > > >>> > >> Hi, Dong, >>> > >> >>> > >> The issue with using just byte[] for OffsetEpoch is that it won't be >>> > >> printable, which makes debugging harder. >>> > >> >>> > >> Also, KIP-222 proposes a listGroupOffset() method in AdminClient. If >>> > that >>> > >> gets adopted before this KIP, we probably want to include >>> OffsetEpoch in >>> > >> the AdminClient too. >>> > >> >>> > >> Thanks, >>> > >> >>> > >> Jun >>> > >> >>> > >> >>> > >> On Thu, Jan 18, 2018 at 6:30 PM, Dong Lin <lindon...@gmail.com> >>> wrote: >>> > >> >>> > >> > Hey Jun, >>> > >> > >>> > >> > I agree. I have updated the KIP to remove the class OffetEpoch and >>> > >> replace >>> > >> > OffsetEpoch with byte[] in APIs that use it. Can you see if it >>> looks >>> > >> good? >>> > >> > >>> > >> > Thanks! >>> > >> > Dong >>> > >> > >>> > >> > On Thu, Jan 18, 2018 at 6:07 PM, Jun Rao <j...@confluent.io> >>> wrote: >>> > >> > >>> > >> > > Hi, Dong, >>> > >> > > >>> > >> > > Thanks for the updated KIP. It looks good to me now. The only >>> thing >>> > is >>> > >> > > for OffsetEpoch. >>> > >> > > If we expose the individual fields in the class, we probably >>> don't >>> > >> need >>> > >> > the >>> > >> > > encode/decode methods. If we want to hide the details of >>> > OffsetEpoch, >>> > >> we >>> > >> > > probably don't want expose the individual fields. >>> > >> > > >>> > >> > > Jun >>> > >> > > >>> > >> > > On Wed, Jan 17, 2018 at 10:10 AM, Dong Lin <lindon...@gmail.com >>> > >>> > >> wrote: >>> > >> > > >>> > >> > > > Thinking about point 61 more, I realize that the async >>> zookeeper >>> > >> read >>> > >> > may >>> > >> > > > make it less of an issue for controller to read more zookeeper >>> > >> nodes. >>> > >> > > > Writing partition_epoch in the per-partition znode makes it >>> > simpler >>> > >> to >>> > >> > > > handle the broker failure between zookeeper writes for a topic >>> > >> > creation. >>> > >> > > I >>> > >> > > > have updated the KIP to use the suggested approach. >>> > >> > > > >>> > >> > > > >>> > >> > > > On Wed, Jan 17, 2018 at 9:57 AM, Dong Lin < >>> lindon...@gmail.com> >>> > >> wrote: >>> > >> > > > >>> > >> > > > > Hey Jun, >>> > >> > > > > >>> > >> > > > > Thanks much for the comments. Please see my comments inline. >>> > >> > > > > >>> > >> > > > > On Tue, Jan 16, 2018 at 4:38 PM, Jun Rao <j...@confluent.io> >>> > >> wrote: >>> > >> > > > > >>> > >> > > > >> Hi, Dong, >>> > >> > > > >> >>> > >> > > > >> Thanks for the updated KIP. Looks good to me overall. Just >>> a >>> > few >>> > >> > minor >>> > >> > > > >> comments. >>> > >> > > > >> >>> > >> > > > >> 60. OffsetAndMetadata positionAndOffsetEpoch(TopicPartition >>> > >> > > partition): >>> > >> > > > >> It >>> > >> > > > >> seems that there is no need to return metadata. We probably >>> > want >>> > >> to >>> > >> > > > return >>> > >> > > > >> sth like OffsetAndEpoch. >>> > >> > > > >> >>> > >> > > > > >>> > >> > > > > Previously I think we may want to re-use the existing class >>> to >>> > >> keep >>> > >> > our >>> > >> > > > > consumer interface simpler. I have updated the KIP to add >>> class >>> > >> > > > > OffsetAndOffsetEpoch. I didn't use OffsetAndEpoch because >>> user >>> > may >>> > >> > > > confuse >>> > >> > > > > this name with OffsetEpoch. Does this sound OK? >>> > >> > > > > >>> > >> > > > > >>> > >> > > > >> >>> > >> > > > >> 61. Should we store partition_epoch in >>> > >> > > > >> /brokers/topics/[topic]/partitions/[partitionId] in ZK? >>> > >> > > > >> >>> > >> > > > > >>> > >> > > > > I have considered this. I think the advantage of adding the >>> > >> > > > > partition->partition_epoch map in the existing >>> > >> > > > > znode /brokers/topics/[topic]/partitions is that controller >>> > only >>> > >> > needs >>> > >> > > > to >>> > >> > > > > read one znode per topic to gets its partition_epoch >>> > information. >>> > >> > > > Otherwise >>> > >> > > > > controller may need to read one extra znode per partition >>> to get >>> > >> the >>> > >> > > same >>> > >> > > > > information. >>> > >> > > > > >>> > >> > > > > When we delete partition or expand partition of a topic, >>> someone >>> > >> > needs >>> > >> > > to >>> > >> > > > > modify partition->partition_epoch map in znode >>> > >> > > > > /brokers/topics/[topic]/partitions. This may seem a bit more >>> > >> > > complicated >>> > >> > > > > than simply adding or deleting znode >>> /brokers/topics/[topic]/ >>> > >> > > > partitions/[partitionId]. >>> > >> > > > > But the complexity is probably similar to the existing >>> operation >>> > >> of >>> > >> > > > > modifying the partition->replica_list mapping in znode >>> > >> > > > > /brokers/topics/[topic]. So not sure it is better to store >>> the >>> > >> > > > > partition_epoch in /brokers/topics/[topic]/partit >>> > >> ions/[partitionId]. >>> > >> > > > What >>> > >> > > > > do you think? >>> > >> > > > > >>> > >> > > > > >>> > >> > > > >> >>> > >> > > > >> 62. For checking outdated metadata in the client, we >>> probably >>> > >> want >>> > >> > to >>> > >> > > > add >>> > >> > > > >> when max_partition_epoch will be used. >>> > >> > > > >> >>> > >> > > > > >>> > >> > > > > The max_partition_epoch is used in the Proposed Changes -> >>> > >> Client's >>> > >> > > > > metadata refresh section to determine whether a metadata is >>> > >> outdated. >>> > >> > > And >>> > >> > > > > this formula is referenced and re-used in other sections to >>> > >> determine >>> > >> > > > > whether a metadata is outdated. Does this formula look OK? >>> > >> > > > > >>> > >> > > > > >>> > >> > > > >> >>> > >> > > > >> 63. "The leader_epoch should be the largest leader_epoch of >>> > >> messages >>> > >> > > > whose >>> > >> > > > >> offset < the commit offset. If no message has been consumed >>> > since >>> > >> > > > consumer >>> > >> > > > >> initialization, the leader_epoch from seek(...) or >>> > >> > OffsetFetchResponse >>> > >> > > > >> should be used. The partition_epoch should be read from the >>> > last >>> > >> > > > >> FetchResponse corresponding to the given partition and >>> commit >>> > >> > offset. >>> > >> > > ": >>> > >> > > > >> leader_epoch and partition_epoch are associated with an >>> offset. >>> > >> So, >>> > >> > if >>> > >> > > > no >>> > >> > > > >> message is consumed, there is no offset and therefore >>> there is >>> > no >>> > >> > need >>> > >> > > > to >>> > >> > > > >> read leader_epoch and partition_epoch. Also, the >>> leader_epoch >>> > >> > > associated >>> > >> > > > >> with the offset should just come from the messages >>> returned in >>> > >> the >>> > >> > > fetch >>> > >> > > > >> response. >>> > >> > > > >> >>> > >> > > > > >>> > >> > > > > I am thinking that, if user calls seek(..) and >>> commitSync(...) >>> > >> > without >>> > >> > > > > consuming any messages, we should re-use the leader_epoch >>> and >>> > >> > > > > partition_epoch provided by the seek(...) in the >>> > >> OffsetCommitRequest. >>> > >> > > And >>> > >> > > > > if messages have been successfully consumed, then >>> leader_epoch >>> > >> will >>> > >> > > come >>> > >> > > > > from the messages returned in the fetch response. The >>> condition >>> > >> > > "messages >>> > >> > > > > whose offset < the commit offset" is needed to take care of >>> the >>> > >> log >>> > >> > > > > compacted topic which may have offset gap due to log >>> cleaning. >>> > >> > > > > >>> > >> > > > > Did I miss something here? Or should I rephrase the >>> paragraph to >>> > >> make >>> > >> > > it >>> > >> > > > > less confusing? >>> > >> > > > > >>> > >> > > > > >>> > >> > > > >> 64. Could you include the public methods in the OffsetEpoch >>> > >> class? >>> > >> > > > >> >>> > >> > > > > >>> > >> > > > > I mistakenly deleted the definition of OffsetEpoch class >>> from >>> > the >>> > >> > KIP. >>> > >> > > I >>> > >> > > > > just added it back with the public methods. Could you take >>> > another >>> > >> > > look? >>> > >> > > > > >>> > >> > > > > >>> > >> > > > >> >>> > >> > > > >> Jun >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > > >> On Thu, Jan 11, 2018 at 5:43 PM, Dong Lin < >>> lindon...@gmail.com >>> > > >>> > >> > > wrote: >>> > >> > > > >> >>> > >> > > > >> > Hey Jun, >>> > >> > > > >> > >>> > >> > > > >> > Thanks much. I agree that we can not rely on committed >>> > offsets >>> > >> to >>> > >> > be >>> > >> > > > >> always >>> > >> > > > >> > deleted when we delete topic. So it is necessary to use a >>> > >> > > > per-partition >>> > >> > > > >> > epoch that does not change unless this partition is >>> deleted. >>> > I >>> > >> > also >>> > >> > > > >> agree >>> > >> > > > >> > that it is very nice to be able to uniquely identify a >>> > message >>> > >> > with >>> > >> > > > >> > (offset, leader_epoch, partition_epoch) in face of >>> potential >>> > >> topic >>> > >> > > > >> deletion >>> > >> > > > >> > and unclean leader election. >>> > >> > > > >> > >>> > >> > > > >> > I agree with all your comments. And I have updated the >>> KIP >>> > >> based >>> > >> > on >>> > >> > > > our >>> > >> > > > >> > latest discussion. In addition, I added >>> > >> > > InvalidPartitionEpochException >>> > >> > > > >> > which will be thrown by consumer.poll() if the >>> > partition_epoch >>> > >> > > > >> associated >>> > >> > > > >> > with the partition, which can be given to consumer using >>> > >> > seek(...), >>> > >> > > is >>> > >> > > > >> > different from the partition_epoch in the FetchResponse. >>> > >> > > > >> > >>> > >> > > > >> > Can you take another look at the latest KIP? >>> > >> > > > >> > >>> > >> > > > >> > Thanks! >>> > >> > > > >> > Dong >>> > >> > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > >>> > >> > > > >> > On Wed, Jan 10, 2018 at 2:24 PM, Jun Rao < >>> j...@confluent.io> >>> > >> > wrote: >>> > >> > > > >> > >>> > >> > > > >> > > Hi, Dong, >>> > >> > > > >> > > >>> > >> > > > >> > > My replies are the following. >>> > >> > > > >> > > >>> > >> > > > >> > > 60. What you described could also work. The drawback is >>> > that >>> > >> we >>> > >> > > will >>> > >> > > > >> be >>> > >> > > > >> > > unnecessarily changing the partition epoch when a >>> partition >>> > >> > hasn't >>> > >> > > > >> really >>> > >> > > > >> > > changed. I was imagining that the partition epoch will >>> be >>> > >> stored >>> > >> > > in >>> > >> > > > >> > > /brokers/topics/[topic]/partitions/[partitionId], >>> instead >>> > >> of at >>> > >> > > the >>> > >> > > > >> > topic >>> > >> > > > >> > > level. So, not sure if ZK size limit is an issue. >>> > >> > > > >> > > >>> > >> > > > >> > > 61, 62 and 65. To me, the offset + offset_epoch is a >>> unique >>> > >> > > > identifier >>> > >> > > > >> > for >>> > >> > > > >> > > a message. So, if a message hasn't changed, the offset >>> and >>> > >> the >>> > >> > > > >> associated >>> > >> > > > >> > > offset_epoch ideally should remain the same (it will be >>> > kind >>> > >> of >>> > >> > > > weird >>> > >> > > > >> if >>> > >> > > > >> > > two consumer apps save the offset on the same message, >>> but >>> > >> the >>> > >> > > > >> > offset_epoch >>> > >> > > > >> > > are different). partition_epoch + leader_epoch give us >>> > that. >>> > >> > > > >> > global_epoch + >>> > >> > > > >> > > leader_epoch don't. If we use this approach, we can >>> solve >>> > not >>> > >> > only >>> > >> > > > the >>> > >> > > > >> > > problem that you have identified, but also other >>> problems >>> > >> when >>> > >> > > there >>> > >> > > > >> is >>> > >> > > > >> > > data loss or topic re-creation more reliably. For >>> example, >>> > in >>> > >> > the >>> > >> > > > >> future, >>> > >> > > > >> > > if we include the partition_epoch and leader_epoch in >>> the >>> > >> fetch >>> > >> > > > >> request, >>> > >> > > > >> > > the server can do a more reliable check of whether that >>> > >> offset >>> > >> > is >>> > >> > > > >> valid >>> > >> > > > >> > or >>> > >> > > > >> > > not. I am not sure that we can rely upon all external >>> > >> offsets to >>> > >> > > be >>> > >> > > > >> > removed >>> > >> > > > >> > > on topic deletion. For example, a topic may be deleted >>> by >>> > an >>> > >> > admin >>> > >> > > > who >>> > >> > > > >> > may >>> > >> > > > >> > > not know all the applications. >>> > >> > > > >> > > >>> > >> > > > >> > > If we agree on the above, the second question is then >>> how >>> > to >>> > >> > > > reliably >>> > >> > > > >> > > propagate the partition_epoch and the leader_epoch to >>> the >>> > >> > consumer >>> > >> > > > >> when >>> > >> > > > >> > > there are leader or partition changes. The leader_epoch >>> > comes >>> > >> > from >>> > >> > > > the >>> > >> > > > >> > > message, which is reliable. So, I was suggesting that >>> when >>> > we >>> > >> > > store >>> > >> > > > an >>> > >> > > > >> > > offset, we can just store the leader_epoch from the >>> message >>> > >> set >>> > >> > > > >> > containing >>> > >> > > > >> > > that offset. Similarly, I was thinking that if the >>> > >> > partition_epoch >>> > >> > > > is >>> > >> > > > >> in >>> > >> > > > >> > > the fetch response, we can propagate partition_epoch >>> > reliably >>> > >> > > where >>> > >> > > > is >>> > >> > > > >> > > partition_epoch change. >>> > >> > > > >> > > >>> > >> > > > >> > > 63. My point is that once a leader is producing a >>> message >>> > in >>> > >> the >>> > >> > > new >>> > >> > > > >> > > partition_epoch, ideally, we should associate the new >>> > offsets >>> > >> > with >>> > >> > > > the >>> > >> > > > >> > new >>> > >> > > > >> > > partition_epoch. Otherwise, the offset_epoch won't be >>> the >>> > >> > correct >>> > >> > > > >> unique >>> > >> > > > >> > > identifier (useful for solving other problems mentioned >>> > >> above). >>> > >> > I >>> > >> > > > was >>> > >> > > > >> > > originally thinking that the leader will include the >>> > >> > > partition_epoch >>> > >> > > > >> in >>> > >> > > > >> > the >>> > >> > > > >> > > metadata cache in the fetch response. It's just that >>> right >>> > >> now, >>> > >> > > > >> metadata >>> > >> > > > >> > > cache is updated on UpdateMetadataRequest, which >>> typically >>> > >> > happens >>> > >> > > > >> after >>> > >> > > > >> > > the LeaderAndIsrRequest. Another approach is for the >>> leader >>> > >> to >>> > >> > > cache >>> > >> > > > >> the >>> > >> > > > >> > > partition_epoch in the Partition object and return that >>> > >> (instead >>> > >> > > of >>> > >> > > > >> the >>> > >> > > > >> > one >>> > >> > > > >> > > in metadata cache) in the fetch response. >>> > >> > > > >> > > >>> > >> > > > >> > > 65. It seems to me that the global_epoch and the >>> > >> partition_epoch >>> > >> > > > have >>> > >> > > > >> > > different purposes. A partition_epoch has the benefit >>> that >>> > it >>> > >> > (1) >>> > >> > > > can >>> > >> > > > >> be >>> > >> > > > >> > > used to form a unique identifier for a message and (2) >>> can >>> > be >>> > >> > used >>> > >> > > > to >>> > >> > > > >> > > solve other >>> > >> > > > >> > > corner case problems in the future. I am not sure >>> having >>> > >> just a >>> > >> > > > >> > > global_epoch can achieve these. global_epoch is useful >>> to >>> > >> > > determine >>> > >> > > > >> which >>> > >> > > > >> > > version of the metadata is newer, especially with topic >>> > >> > deletion. >>> > >> > > > >> > > >>> > >> > > > >> > > Thanks, >>> > >> > > > >> > > >>> > >> > > > >> > > Jun >>> > >> > > > >> > > >>> > >> > > > >> > > On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin < >>> > >> lindon...@gmail.com> >>> > >> > > > >> wrote: >>> > >> > > > >> > > >>> > >> > > > >> > > > Regarding the use of the global epoch in 65), it is >>> very >>> > >> > similar >>> > >> > > > to >>> > >> > > > >> the >>> > >> > > > >> > > > proposal of the metadata_epoch we discussed earlier. >>> The >>> > >> main >>> > >> > > > >> > difference >>> > >> > > > >> > > is >>> > >> > > > >> > > > that this epoch is incremented when we >>> > create/expand/delete >>> > >> > > topic >>> > >> > > > >> and >>> > >> > > > >> > > does >>> > >> > > > >> > > > not change when controller re-send metadata. >>> > >> > > > >> > > > >>> > >> > > > >> > > > I looked at our previous discussion. It seems that we >>> > >> prefer >>> > >> > > > >> > > > partition_epoch over the metadata_epoch because 1) we >>> > >> prefer >>> > >> > not >>> > >> > > > to >>> > >> > > > >> > have >>> > >> > > > >> > > an >>> > >> > > > >> > > > ever growing metadata_epoch and 2) we can reset >>> offset >>> > >> better >>> > >> > > when >>> > >> > > > >> > topic >>> > >> > > > >> > > is >>> > >> > > > >> > > > re-created. The use of global topic_epoch avoids the >>> > >> drawback >>> > >> > of >>> > >> > > > an >>> > >> > > > >> > ever >>> > >> > > > >> > > > quickly ever growing metadata_epoch. Though the >>> global >>> > >> epoch >>> > >> > > does >>> > >> > > > >> not >>> > >> > > > >> > > allow >>> > >> > > > >> > > > us to recognize the invalid offset committed before >>> the >>> > >> topic >>> > >> > > > >> > > re-creation, >>> > >> > > > >> > > > we can probably just delete the offset when we >>> delete a >>> > >> topic. >>> > >> > > > Thus >>> > >> > > > >> I >>> > >> > > > >> > am >>> > >> > > > >> > > > not very sure whether it is still worthwhile to have >>> a >>> > >> > > > per-partition >>> > >> > > > >> > > > partition_epoch if the metadata already has the >>> global >>> > >> epoch. >>> > >> > > > >> > > > >>> > >> > > > >> > > > >>> > >> > > > >> > > > On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin < >>> > >> lindon...@gmail.com >>> > >> > > >>> > >> > > > >> wrote: >>> > >> > > > >> > > > >>> > >> > > > >> > > > > Hey Jun, >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Thanks so much. These comments very useful. Please >>> see >>> > >> below >>> > >> > > my >>> > >> > > > >> > > comments. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao < >>> > >> j...@confluent.io> >>> > >> > > > wrote: >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> Hi, Dong, >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> Thanks for the updated KIP. A few more comments. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> 60. Perhaps having a partition epoch is more >>> flexible >>> > >> since >>> > >> > > in >>> > >> > > > >> the >>> > >> > > > >> > > > future, >>> > >> > > > >> > > > >> we may support deleting a partition as well. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Yeah I have considered this. I think we can >>> probably >>> > >> still >>> > >> > > > support >>> > >> > > > >> > > > > deleting a partition by using the topic_epoch -- >>> when >>> > >> > > partition >>> > >> > > > >> of a >>> > >> > > > >> > > > topic >>> > >> > > > >> > > > > is deleted or created, epoch of all partitions of >>> this >>> > >> topic >>> > >> > > > will >>> > >> > > > >> be >>> > >> > > > >> > > > > incremented by 1. Therefore, if that partition is >>> > >> re-created >>> > >> > > > >> later, >>> > >> > > > >> > the >>> > >> > > > >> > > > > epoch of that partition will still be larger than >>> its >>> > >> epoch >>> > >> > > > before >>> > >> > > > >> > the >>> > >> > > > >> > > > > deletion, which still allows the client to order >>> the >>> > >> > metadata >>> > >> > > > for >>> > >> > > > >> the >>> > >> > > > >> > > > > purpose of this KIP. Does this sound reasonable? >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > The advantage of using topic_epoch instead of >>> > >> > partition_epoch >>> > >> > > is >>> > >> > > > >> that >>> > >> > > > >> > > the >>> > >> > > > >> > > > > size of the /brokers/topics/[topic] znode and >>> > >> > request/response >>> > >> > > > >> size >>> > >> > > > >> > can >>> > >> > > > >> > > > be >>> > >> > > > >> > > > > smaller. We have a limit on the maximum size of >>> znode >>> > >> > > (typically >>> > >> > > > >> > 1MB). >>> > >> > > > >> > > > Use >>> > >> > > > >> > > > > partition epoch can effectively reduce the number >>> of >>> > >> > > partitions >>> > >> > > > >> that >>> > >> > > > >> > > can >>> > >> > > > >> > > > be >>> > >> > > > >> > > > > described by the /brokers/topics/[topic] znode. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > One use-case of partition_epoch for client to >>> detect >>> > that >>> > >> > the >>> > >> > > > >> > committed >>> > >> > > > >> > > > > offset, either from kafka offset topic or from the >>> > >> external >>> > >> > > > store >>> > >> > > > >> is >>> > >> > > > >> > > > > invalid after partition deletion and re-creation. >>> > >> However, >>> > >> > it >>> > >> > > > >> seems >>> > >> > > > >> > > that >>> > >> > > > >> > > > we >>> > >> > > > >> > > > > can also address this use-case with other >>> approaches. >>> > For >>> > >> > > > example, >>> > >> > > > >> > when >>> > >> > > > >> > > > > AdminClient deletes partitions, it can also delete >>> the >>> > >> > > committed >>> > >> > > > >> > > offsets >>> > >> > > > >> > > > > for those partitions from the offset topic. If user >>> > >> stores >>> > >> > > > offset >>> > >> > > > >> > > > > externally, it might make sense for user to >>> similarly >>> > >> remove >>> > >> > > > >> offsets >>> > >> > > > >> > of >>> > >> > > > >> > > > > related partitions after these partitions are >>> deleted. >>> > >> So I >>> > >> > am >>> > >> > > > not >>> > >> > > > >> > sure >>> > >> > > > >> > > > > that we should use partition_epoch in this KIP. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> 61. It seems that the leader epoch returned in the >>> > >> > position() >>> > >> > > > >> call >>> > >> > > > >> > > > should >>> > >> > > > >> > > > >> the the leader epoch returned in the fetch >>> response, >>> > not >>> > >> > the >>> > >> > > > one >>> > >> > > > >> in >>> > >> > > > >> > > the >>> > >> > > > >> > > > >> metadata cache of the client. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > I think this is a good idea. Just to double check, >>> this >>> > >> > change >>> > >> > > > >> does >>> > >> > > > >> > not >>> > >> > > > >> > > > > affect the correctness or performance of this KIP. >>> But >>> > it >>> > >> > can >>> > >> > > be >>> > >> > > > >> > useful >>> > >> > > > >> > > > if >>> > >> > > > >> > > > > we want to use the leader_epoch to better handle >>> the >>> > >> offset >>> > >> > > rest >>> > >> > > > >> in >>> > >> > > > >> > > case >>> > >> > > > >> > > > of >>> > >> > > > >> > > > > unclean leader election, which is listed in the >>> future >>> > >> work. >>> > >> > > Is >>> > >> > > > >> this >>> > >> > > > >> > > > > understanding correct? >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > I have updated the KIP to specify that the >>> leader_epoch >>> > >> > > returned >>> > >> > > > >> by >>> > >> > > > >> > > > > position() should be the largest leader_epoch of >>> those >>> > >> > already >>> > >> > > > >> > consumed >>> > >> > > > >> > > > > messages whose offset < position. If no message has >>> > been >>> > >> > > > consumed >>> > >> > > > >> > since >>> > >> > > > >> > > > > consumer initialization, the leader_epoch from >>> seek() >>> > or >>> > >> > > > >> > > > > OffsetFetchResponse should be used. The offset >>> included >>> > >> in >>> > >> > the >>> > >> > > > >> > > > > OffsetCommitRequest will also be determined in the >>> > >> similar >>> > >> > > > manner. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> 62. I am wondering if we should return the >>> partition >>> > >> epoch >>> > >> > in >>> > >> > > > the >>> > >> > > > >> > > fetch >>> > >> > > > >> > > > >> response as well. In the current proposal, if a >>> topic >>> > is >>> > >> > > > >> recreated >>> > >> > > > >> > and >>> > >> > > > >> > > > the >>> > >> > > > >> > > > >> new leader is on the same broker as the old one, >>> there >>> > >> is >>> > >> > > > >> nothing to >>> > >> > > > >> > > > force >>> > >> > > > >> > > > >> the metadata refresh in the client. So, the >>> client may >>> > >> > still >>> > >> > > > >> > associate >>> > >> > > > >> > > > the >>> > >> > > > >> > > > >> offset with the old partition epoch. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Could you help me understand the problem if a >>> client >>> > >> > > associates >>> > >> > > > >> old >>> > >> > > > >> > > > > partition_epoch (or the topic_epoch as of the >>> current >>> > >> KIP) >>> > >> > > with >>> > >> > > > >> the >>> > >> > > > >> > > > offset? >>> > >> > > > >> > > > > The main purpose of the topic_epoch is to be able >>> to >>> > drop >>> > >> > > > >> > leader_epoch >>> > >> > > > >> > > > to 0 >>> > >> > > > >> > > > > after a partition is deleted and re-created. I >>> guess >>> > you >>> > >> may >>> > >> > > be >>> > >> > > > >> > > thinking >>> > >> > > > >> > > > > about using the partition_epoch to detect that the >>> > >> committed >>> > >> > > > >> offset >>> > >> > > > >> > is >>> > >> > > > >> > > > > invalid? In that case, I am wondering if the >>> > alternative >>> > >> > > > approach >>> > >> > > > >> > > > described >>> > >> > > > >> > > > > in 60) would be reasonable. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> 63. There is some subtle coordination between the >>> > >> > > > >> > LeaderAndIsrRequest >>> > >> > > > >> > > > and >>> > >> > > > >> > > > >> UpdateMetadataRequest. Currently, when a leader >>> > changes, >>> > >> > the >>> > >> > > > >> > > controller >>> > >> > > > >> > > > >> first sends the LeaderAndIsrRequest to the >>> assigned >>> > >> > replicas >>> > >> > > > and >>> > >> > > > >> the >>> > >> > > > >> > > > >> UpdateMetadataRequest to every broker. So, there >>> could >>> > >> be a >>> > >> > > > small >>> > >> > > > >> > > window >>> > >> > > > >> > > > >> when the leader already receives the new partition >>> > >> epoch in >>> > >> > > the >>> > >> > > > >> > > > >> LeaderAndIsrRequest, but the metadata cache in the >>> > >> broker >>> > >> > > > hasn't >>> > >> > > > >> > been >>> > >> > > > >> > > > >> updated with the latest partition epoch. Not sure >>> > what's >>> > >> > the >>> > >> > > > best >>> > >> > > > >> > way >>> > >> > > > >> > > to >>> > >> > > > >> > > > >> address this issue. Perhaps we can update the >>> metadata >>> > >> > cache >>> > >> > > on >>> > >> > > > >> the >>> > >> > > > >> > > > broker >>> > >> > > > >> > > > >> with both LeaderAndIsrRequest and >>> > UpdateMetadataRequest. >>> > >> > The >>> > >> > > > >> > challenge >>> > >> > > > >> > > > is >>> > >> > > > >> > > > >> that the two have slightly different data. For >>> > example, >>> > >> > only >>> > >> > > > the >>> > >> > > > >> > > latter >>> > >> > > > >> > > > >> has >>> > >> > > > >> > > > >> all endpoints. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > I am not sure whether this is a problem. Could you >>> > >> explain a >>> > >> > > bit >>> > >> > > > >> more >>> > >> > > > >> > > > what >>> > >> > > > >> > > > > specific problem this small window can cause? >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Since client can fetch metadata from any broker in >>> the >>> > >> > > cluster, >>> > >> > > > >> and >>> > >> > > > >> > > given >>> > >> > > > >> > > > > that different brokers receive request (e.g. >>> > >> > > LeaderAndIsrRequest >>> > >> > > > >> and >>> > >> > > > >> > > > > UpdateMetadataRequest) in arbitrary order, the >>> metadata >>> > >> > > received >>> > >> > > > >> by >>> > >> > > > >> > > > client >>> > >> > > > >> > > > > can be in arbitrary order (either newer or older) >>> > >> compared >>> > >> > to >>> > >> > > > the >>> > >> > > > >> > > > broker's >>> > >> > > > >> > > > > leadership state even if a given broker receives >>> > >> > > > >> LeaderAndIsrRequest >>> > >> > > > >> > > and >>> > >> > > > >> > > > > UpdateMetadataRequest simultaneously. So I am not >>> sure >>> > >> it is >>> > >> > > > >> useful >>> > >> > > > >> > to >>> > >> > > > >> > > > > update broker's cache with LeaderAndIsrRequest. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> 64. The enforcement of leader epoch in Offset >>> commit: >>> > We >>> > >> > > allow >>> > >> > > > a >>> > >> > > > >> > > > consumer >>> > >> > > > >> > > > >> to set an arbitrary offset. So it's possible for >>> > >> offsets or >>> > >> > > > >> leader >>> > >> > > > >> > > epoch >>> > >> > > > >> > > > >> to >>> > >> > > > >> > > > >> go backwards. I am not sure if we could always >>> enforce >>> > >> that >>> > >> > > the >>> > >> > > > >> > leader >>> > >> > > > >> > > > >> epoch only goes up on the broker. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Sure. I have removed this check from the KIP. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > BTW, we can probably still ensure that the >>> leader_epoch >>> > >> > always >>> > >> > > > >> > increase >>> > >> > > > >> > > > if >>> > >> > > > >> > > > > the leader_epoch used with offset commit is the >>> > >> > > max(leader_epoch >>> > >> > > > >> of >>> > >> > > > >> > the >>> > >> > > > >> > > > > message with offset = the committed offset - 1, the >>> > >> largest >>> > >> > > > known >>> > >> > > > >> > > > > leader_epoch from the metadata). But I don't have a >>> > good >>> > >> > > > use-case >>> > >> > > > >> for >>> > >> > > > >> > > > this >>> > >> > > > >> > > > > alternative definition. So I choose the keep the >>> KIP >>> > >> simple >>> > >> > by >>> > >> > > > >> > > requiring >>> > >> > > > >> > > > > leader_epoch to always increase. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> 65. Good point on handling missing partition >>> epoch due >>> > >> to >>> > >> > > topic >>> > >> > > > >> > > > deletion. >>> > >> > > > >> > > > >> Another potential way to address this is to >>> > additionally >>> > >> > > > >> propagate >>> > >> > > > >> > the >>> > >> > > > >> > > > >> global partition epoch to brokers and the clients. >>> > This >>> > >> > way, >>> > >> > > > >> when a >>> > >> > > > >> > > > >> partition epoch is missing, we can use the global >>> > >> partition >>> > >> > > > >> epoch to >>> > >> > > > >> > > > >> reason >>> > >> > > > >> > > > >> about which metadata is more recent. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > This is a great idea. The global epoch can be used >>> to >>> > >> order >>> > >> > > the >>> > >> > > > >> > > metadata >>> > >> > > > >> > > > > and help us recognize the more recent metadata if a >>> > topic >>> > >> > (or >>> > >> > > > >> > > partition) >>> > >> > > > >> > > > is >>> > >> > > > >> > > > > deleted and re-created. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Actually, it seems we only need to propagate the >>> global >>> > >> > epoch >>> > >> > > to >>> > >> > > > >> > > brokers >>> > >> > > > >> > > > > and clients without propagating this epoch on a >>> > >> per-topic or >>> > >> > > > >> > > > per-partition >>> > >> > > > >> > > > > basic. Doing so would simply interface changes made >>> > this >>> > >> > KIP. >>> > >> > > > Does >>> > >> > > > >> > this >>> > >> > > > >> > > > > approach sound reasonable? >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> 66. A client may also get an offset by time using >>> the >>> > >> > > > >> > offsetForTimes() >>> > >> > > > >> > > > >> api. >>> > >> > > > >> > > > >> So, we probably want to include >>> offsetInternalMetadata >>> > >> in >>> > >> > > > >> > > > >> OffsetAndTimestamp >>> > >> > > > >> > > > >> as well. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > You are right. This probably also requires us to >>> change >>> > >> the >>> > >> > > > >> > > > > ListOffsetRequest as well. I will update the KIP >>> after >>> > we >>> > >> > > agree >>> > >> > > > on >>> > >> > > > >> > the >>> > >> > > > >> > > > > solution for 65). >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> 67. InteralMetadata can be a bit confusing with >>> the >>> > >> > metadata >>> > >> > > > >> field >>> > >> > > > >> > > > already >>> > >> > > > >> > > > >> there. Perhaps we can just call it OffsetEpoch. It >>> > >> might be >>> > >> > > > >> useful >>> > >> > > > >> > to >>> > >> > > > >> > > > make >>> > >> > > > >> > > > >> OffsetEpoch printable at least for debugging >>> purpose. >>> > >> Once >>> > >> > > you >>> > >> > > > do >>> > >> > > > >> > > that, >>> > >> > > > >> > > > we >>> > >> > > > >> > > > >> are already exposing the internal fields. So, not >>> sure >>> > >> if >>> > >> > > it's >>> > >> > > > >> worth >>> > >> > > > >> > > > >> hiding >>> > >> > > > >> > > > >> them. If we do want to hide them, perhaps we can >>> have >>> > >> sth >>> > >> > > like >>> > >> > > > >> the >>> > >> > > > >> > > > >> following. The binary encoding is probably more >>> > >> efficient >>> > >> > > than >>> > >> > > > >> JSON >>> > >> > > > >> > > for >>> > >> > > > >> > > > >> external storage. >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> OffsetEpoch { >>> > >> > > > >> > > > >> static OffsetEpoch decode(byte[]); >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> public byte[] encode(); >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> public String toString(); >>> > >> > > > >> > > > >> } >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > Thanks much. I like this solution. I have updated >>> the >>> > KIP >>> > >> > > > >> > accordingly. >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> Jun >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin < >>> > >> > > lindon...@gmail.com> >>> > >> > > > >> > wrote: >>> > >> > > > >> > > > >> >>> > >> > > > >> > > > >> > Hey Jason, >>> > >> > > > >> > > > >> > >>> > >> > > > >> > > > >> > Certainly. This sounds good. I have updated the >>> KIP >>> > to >>> > >> > > > clarity >>> > >> > > > >> > that >>> > >> > > > >> > > > the >>> > >> > > > >> > > > >> > global epoch will be incremented by 1 each time >>> a >>> > >> topic >>> > >> > is >>> > >> > > > >> > deleted. >>> > >> > > > >> > > > >> > >>> > >> > > > >> > > > >> > Thanks, >>> > >> > > > >> > > > >> > Dong >>> > >> > > > >> > > > >> > >>> > >> > > > >> > > > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson >>> < >>> > >> > > > >> > ja...@confluent.io >>> > >> > > > >> > > > >>> > >> > > > >> > > > >> > wrote: >>> > >> > > > >> > > > >> > >>> > >> > > > >> > > > >> > > Hi Dong, >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > I think your approach will allow user to >>> > distinguish >>> > >> > > > between >>> > >> > > > >> the >>> > >> > > > >> > > > >> metadata >>> > >> > > > >> > > > >> > > > before and after the topic deletion. I also >>> > agree >>> > >> > that >>> > >> > > > this >>> > >> > > > >> > can >>> > >> > > > >> > > be >>> > >> > > > >> > > > >> > > > potentially be useful to user. I am just not >>> > very >>> > >> > sure >>> > >> > > > >> whether >>> > >> > > > >> > > we >>> > >> > > > >> > > > >> > already >>> > >> > > > >> > > > >> > > > have a good use-case to make the additional >>> > >> > complexity >>> > >> > > > >> > > worthwhile. >>> > >> > > > >> > > > >> It >>> > >> > > > >> > > > >> > > seems >>> > >> > > > >> > > > >> > > > that this feature is kind of independent of >>> the >>> > >> main >>> > >> > > > >> problem >>> > >> > > > >> > of >>> > >> > > > >> > > > this >>> > >> > > > >> > > > >> > KIP. >>> > >> > > > >> > > > >> > > > Could we add this as a future work? >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > Do you think it's fair if we bump the topic >>> epoch >>> > on >>> > >> > > > deletion >>> > >> > > > >> > and >>> > >> > > > >> > > > >> leave >>> > >> > > > >> > > > >> > > propagation of the epoch for deleted topics >>> for >>> > >> future >>> > >> > > > work? >>> > >> > > > >> I >>> > >> > > > >> > > don't >>> > >> > > > >> > > > >> > think >>> > >> > > > >> > > > >> > > this adds much complexity and it makes the >>> > behavior >>> > >> > > > >> consistent: >>> > >> > > > >> > > > every >>> > >> > > > >> > > > >> > topic >>> > >> > > > >> > > > >> > > mutation results in an epoch bump. >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > Thanks, >>> > >> > > > >> > > > >> > > Jason >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin < >>> > >> > > > >> lindon...@gmail.com> >>> > >> > > > >> > > > wrote: >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > > Hey Ismael, >>> > >> > > > >> > > > >> > > > >>> > >> > > > >> > > > >> > > > I guess we actually need user to see this >>> field >>> > so >>> > >> > that >>> > >> > > > >> user >>> > >> > > > >> > can >>> > >> > > > >> > > > >> store >>> > >> > > > >> > > > >> > > this >>> > >> > > > >> > > > >> > > > value in the external store together with >>> the >>> > >> offset. >>> > >> > > We >>> > >> > > > >> just >>> > >> > > > >> > > > prefer >>> > >> > > > >> > > > >> > the >>> > >> > > > >> > > > >> > > > value to be opaque to discourage most users >>> from >>> > >> > > > >> interpreting >>> > >> > > > >> > > this >>> > >> > > > >> > > > >> > value. >>> > >> > > > >> > > > >> > > > One more advantage of using such an opaque >>> field >>> > >> is >>> > >> > to >>> > >> > > be >>> > >> > > > >> able >>> > >> > > > >> > > to >>> > >> > > > >> > > > >> > evolve >>> > >> > > > >> > > > >> > > > the information (or schema) of this value >>> > without >>> > >> > > > changing >>> > >> > > > >> > > > consumer >>> > >> > > > >> > > > >> API >>> > >> > > > >> > > > >> > > in >>> > >> > > > >> > > > >> > > > the future. >>> > >> > > > >> > > > >> > > > >>> > >> > > > >> > > > >> > > > I also thinking it is probably OK for user >>> to be >>> > >> able >>> > >> > > to >>> > >> > > > >> > > interpret >>> > >> > > > >> > > > >> this >>> > >> > > > >> > > > >> > > > value, particularly for those advanced >>> users. >>> > >> > > > >> > > > >> > > > >>> > >> > > > >> > > > >> > > > Thanks, >>> > >> > > > >> > > > >> > > > Dong >>> > >> > > > >> > > > >> > > > >>> > >> > > > >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma >>> < >>> > >> > > > >> > ism...@juma.me.uk> >>> > >> > > > >> > > > >> wrote: >>> > >> > > > >> > > > >> > > > >>> > >> > > > >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason >>> > Gustafson >>> > >> < >>> > >> > > > >> > > > >> ja...@confluent.io> >>> > >> > > > >> > > > >> > > > > wrote: >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > class OffsetAndMetadata { >>> > >> > > > >> > > > >> > > > > > long offset; >>> > >> > > > >> > > > >> > > > > > byte[] offsetMetadata; >>> > >> > > > >> > > > >> > > > > > String metadata; >>> > >> > > > >> > > > >> > > > > > } >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > > Admittedly, the naming is a bit >>> annoying, >>> > but >>> > >> we >>> > >> > > can >>> > >> > > > >> > > probably >>> > >> > > > >> > > > >> come >>> > >> > > > >> > > > >> > up >>> > >> > > > >> > > > >> > > > > with >>> > >> > > > >> > > > >> > > > > > something better. Internally the byte >>> array >>> > >> would >>> > >> > > > have >>> > >> > > > >> a >>> > >> > > > >> > > > >> version. >>> > >> > > > >> > > > >> > If >>> > >> > > > >> > > > >> > > in >>> > >> > > > >> > > > >> > > > > the >>> > >> > > > >> > > > >> > > > > > future we have anything else we need to >>> add, >>> > >> we >>> > >> > can >>> > >> > > > >> update >>> > >> > > > >> > > the >>> > >> > > > >> > > > >> > > version >>> > >> > > > >> > > > >> > > > > and >>> > >> > > > >> > > > >> > > > > > we wouldn't need any new APIs. >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > We can also add fields to a class in a >>> > >> compatible >>> > >> > > way. >>> > >> > > > >> So, >>> > >> > > > >> > it >>> > >> > > > >> > > > >> seems >>> > >> > > > >> > > > >> > to >>> > >> > > > >> > > > >> > > me >>> > >> > > > >> > > > >> > > > > that the main advantage of the byte array >>> is >>> > >> that >>> > >> > > it's >>> > >> > > > >> > opaque >>> > >> > > > >> > > to >>> > >> > > > >> > > > >> the >>> > >> > > > >> > > > >> > > > user. >>> > >> > > > >> > > > >> > > > > Is that correct? If so, we could also add >>> any >>> > >> > opaque >>> > >> > > > >> > metadata >>> > >> > > > >> > > > in a >>> > >> > > > >> > > > >> > > > subclass >>> > >> > > > >> > > > >> > > > > so that users don't even see it (unless >>> they >>> > >> cast >>> > >> > it, >>> > >> > > > but >>> > >> > > > >> > then >>> > >> > > > >> > > > >> > they're >>> > >> > > > >> > > > >> > > on >>> > >> > > > >> > > > >> > > > > their own). >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > Ismael >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > The corresponding seek() and position() >>> APIs >>> > >> might >>> > >> > > look >>> > >> > > > >> > > > something >>> > >> > > > >> > > > >> > like >>> > >> > > > >> > > > >> > > > > this: >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > void seek(TopicPartition partition, long >>> > >> offset, >>> > >> > > > byte[] >>> > >> > > > >> > > > >> > > > offsetMetadata); >>> > >> > > > >> > > > >> > > > > > byte[] positionMetadata(TopicPartition >>> > >> > partition); >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > What do you think? >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > Thanks, >>> > >> > > > >> > > > >> > > > > > Jason >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong >>> Lin < >>> > >> > > > >> > > lindon...@gmail.com >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > wrote: >>> > >> > > > >> > > > >> > > > > > >>> > >> > > > >> > > > >> > > > > > > Hey Jun, Jason, >>> > >> > > > >> > > > >> > > > > > > >>> > >> > > > >> > > > >> > > > > > > Thanks much for all the feedback. I >>> have >>> > >> > updated >>> > >> > > > the >>> > >> > > > >> KIP >>> > >> > > > >> > > > >> based on >>> > >> > > > >> > > > >> > > the >>> > >> > > > >> > > > >> > > > > > > latest discussion. Can you help check >>> > >> whether >>> > >> > it >>> > >> > > > >> looks >>> > >> > > > >> > > good? >>> > >> > > > >> > > > >> > > > > > > >>> > >> > > > >> > > > >> > > > > > > Thanks, >>> > >> > > > >> > > > >> > > > > > > Dong >>> > >> > > > >> > > > >> > > > > > > >>> > >> > > > >> > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong >>> Lin < >>> > >> > > > >> > > > lindon...@gmail.com >>> > >> > > > >> > > > >> > >>> > >> > > > >> > > > >> > > > wrote: >>> > >> > > > >> > > > >> > > > > > > >>> > >> > > > >> > > > >> > > > > > > > Hey Jun, >>> > >> > > > >> > > > >> > > > > > > > >>> > >> > > > >> > > > >> > > > > > > > Hmm... thinking about this more, I >>> am >>> > not >>> > >> > sure >>> > >> > > > that >>> > >> > > > >> > the >>> > >> > > > >> > > > >> > proposed >>> > >> > > > >> > > > >> > > > API >>> > >> > > > >> > > > >> > > > > is >>> > >> > > > >> > > > >> > > > > > > > sufficient. For users that store >>> offset >>> > >> > > > >> externally, we >>> > >> > > > >> > > > >> probably >>> > >> > > > >> > > > >> > > > need >>> > >> > > > >> > > > >> > > > > > > extra >>> > >> > > > >> > > > >> > > > > > > > API to return the leader_epoch and >>> > >> > > > partition_epoch >>> > >> > > > >> for >>> > >> > > > >> > > all >>> > >> > > > >> > > > >> > > > partitions >>> > >> > > > >> > > > >> > > > > > > that >>> > >> > > > >> > > > >> > > > > > > > consumers are consuming. I suppose >>> these >>> > >> > users >>> > >> > > > >> > currently >>> > >> > > > >> > > > use >>> > >> > > > >> > > > >> > > > > position() >>> > >> > > > >> > > > >> > > > > > > to >>> > >> > > > >> > > > >> > > > > > > > get the offset. Thus we probably >>> need a >>> > >> new >>> > >> > > > method >>> > >> > > > >> > > > >> > > > > > positionWithEpoch(..) >>> > >> > > > >> > > > >> > > > > > > to >>> > >> > > > >> > > > >> > > > > > > > return <offset, partition_epoch, >>> > >> > leader_epoch>. >>> > >> > > > >> Does >>> > >> > > > >> > > this >>> > >> > > > >> > > > >> sound >>> > >> > > > >> > > > >> > > > > > > reasonable? >>> > >> > > > >> > > > >> > > > > > > > >>> > >> > > > >> > > > >> > > > > > > > Thanks, >>> > >> > > > >> > > > >> > > > > > > > Dong >>> > >> > > > >> > > > >> > > > > > > > >>> > >> > > > >> > > > >> > > > > > > > >>> > >> > > > >> > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun >>> Rao >>> > < >>> > >> > > > >> > > j...@confluent.io >>> > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > wrote: >>> > >> > > > >> > > > >> > > > > > > > >>> > >> > > > >> > > > >> > > > > > > >> Hi, Dong, >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> Yes, that's what I am thinking. >>> > >> OffsetEpoch >>> > >> > > will >>> > >> > > > >> be >>> > >> > > > >> > > > >> composed >>> > >> > > > >> > > > >> > of >>> > >> > > > >> > > > >> > > > > > > >> (partition_epoch, >>> > >> > > > >> > > > >> > > > > > > >> leader_epoch). >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> Thanks, >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> Jun >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, >>> Dong >>> > Lin >>> > >> < >>> > >> > > > >> > > > >> lindon...@gmail.com >>> > >> > > > >> > > > >> > > >>> > >> > > > >> > > > >> > > > > wrote: >>> > >> > > > >> > > > >> > > > > > > >> >>> > >> > > > >> > > > >> > > > > > > >> > Hey Jun, >>> > >> > > > >> > > > >> > > > > > > >> > >>> > >> > > > >> > > > >> > > > > > > >> > Thanks much. I like the the new >>> API >>> > >> that >>> > >> > you >>> > >> > > > >> > > proposed. >>> > >> > > > >> > > > I >>> > >> > > > >> > > > >> am >>> > >> > > > >> > > > >> > > not >>> > >> > > > >> > > > >> > > > > sure >>> > >> > > > >> > > > >> > > > > > > >> what >>> > >> > > > >> > > > >> > > > > > > >> > you exactly mean by >>> offset_epoch. I >>> > >> > suppose >>> > >> > > > >> that we >>> > >> > > > >> > > can >>> > >> > > > >> > > > >> use >>> > >> > > > >> > > > >> > > the >>> > >> > > > >> > > > >> > > > > pair >>> > >> > > > >> > > > >> > > > > > > of >>> > >> > > > >> > > > >> > > > > > > >> > (partition_epoch, leader_epoch) >>> as >>> > the >>> > >> > > > >> > offset_epoch, >>> > >> > > > >> > > > >> right? >>> > >> > > > >> > > > >> > > > > > > >> > >>> > >> > > > >> > > > >> > > > > > > >> > Thanks, >>> > >> > > > >> > > > >> > > > > > > >> > Dong >>> > >> > > > >> > > > >> > > > > > > >> > >>> > >> > > > >> > > > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, >>> Jun >>> > >> Rao < >>> > >> > > > >> > > > >> j...@confluent.io> >>> > >> > > > >> > > > >> > > > wrote: >>> > >> > > > >> > > > >> > > > > > > >> > >>> > >> > > > >> > > > >> > > > > > > >> > > Hi, Dong, >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > Got it. The api that you >>> proposed >>> > >> works. >>> > >> > > The >>> > >> > > > >> > > question >>> > >> > > > >> > > > >> is >>> > >> > > > >> > > > >> > > > whether >>> > >> > > > >> > > > >> > > > > > > >> that's >>> > >> > > > >> > > > >> > > > > > > >> > the >>> > >> > > > >> > > > >> > > > > > > >> > > api that we want to have in the >>> > long >>> > >> > term. >>> > >> > > > My >>> > >> > > > >> > > concern >>> > >> > > > >> > > > >> is >>> > >> > > > >> > > > >> > > that >>> > >> > > > >> > > > >> > > > > > while >>> > >> > > > >> > > > >> > > > > > > >> the >>> > >> > > > >> > > > >> > > > > > > >> > api >>> > >> > > > >> > > > >> > > > > > > >> > > change is simple, the new api >>> seems >>> > >> > harder >>> > >> > > > to >>> > >> > > > >> > > explain >>> > >> > > > >> > > > >> and >>> > >> > > > >> > > > >> > > use. >>> > >> > > > >> > > > >> > > > > For >>> > >> > > > >> > > > >> > > > > > > >> > example, >>> > >> > > > >> > > > >> > > > > > > >> > > a consumer storing offsets >>> > externally >>> > >> > now >>> > >> > > > >> needs >>> > >> > > > >> > to >>> > >> > > > >> > > > call >>> > >> > > > >> > > > >> > > > > > > >> > > waitForMetadataUpdate() after >>> > calling >>> > >> > > > seek(). >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > An alternative approach is to >>> make >>> > >> the >>> > >> > > > >> following >>> > >> > > > >> > > > >> > compatible >>> > >> > > > >> > > > >> > > > api >>> > >> > > > >> > > > >> > > > > > > >> changes >>> > >> > > > >> > > > >> > > > > > > >> > in >>> > >> > > > >> > > > >> > > > > > > >> > > Consumer. >>> > >> > > > >> > > > >> > > > > > > >> > > * Add an additional OffsetEpoch >>> > >> field in >>> > >> > > > >> > > > >> > OffsetAndMetadata. >>> > >> > > > >> > > > >> > > > (no >>> > >> > > > >> > > > >> > > > > > need >>> > >> > > > >> > > > >> > > > > > > >> to >>> > >> > > > >> > > > >> > > > > > > >> > > change the CommitSync() api) >>> > >> > > > >> > > > >> > > > > > > >> > > * Add a new api >>> seek(TopicPartition >>> > >> > > > partition, >>> > >> > > > >> > long >>> > >> > > > >> > > > >> > offset, >>> > >> > > > >> > > > >> > > > > > > >> OffsetEpoch >>> > >> > > > >> > > > >> > > > > > > >> > > offsetEpoch). We can >>> potentially >>> > >> > deprecate >>> > >> > > > the >>> > >> > > > >> > old >>> > >> > > > >> > > > api >>> > >> > > > >> > > > >> > > > > > > >> > seek(TopicPartition >>> > >> > > > >> > > > >> > > > > > > >> > > partition, long offset) in the >>> > >> future. >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > The alternative approach has >>> > similar >>> > >> > > amount >>> > >> > > > of >>> > >> > > > >> > api >>> > >> > > > >> > > > >> changes >>> > >> > > > >> > > > >> > > as >>> > >> > > > >> > > > >> > > > > > yours >>> > >> > > > >> > > > >> > > > > > > >> but >>> > >> > > > >> > > > >> > > > > > > >> > has >>> > >> > > > >> > > > >> > > > > > > >> > > the following benefits. >>> > >> > > > >> > > > >> > > > > > > >> > > 1. The api works in a similar >>> way >>> > as >>> > >> how >>> > >> > > > >> offset >>> > >> > > > >> > > > >> management >>> > >> > > > >> > > > >> > > > works >>> > >> > > > >> > > > >> > > > > > now >>> > >> > > > >> > > > >> > > > > > > >> and >>> > >> > > > >> > > > >> > > > > > > >> > is >>> > >> > > > >> > > > >> > > > > > > >> > > probably what we want in the >>> long >>> > >> term. >>> > >> > > > >> > > > >> > > > > > > >> > > 2. It can reset offsets better >>> when >>> > >> > there >>> > >> > > is >>> > >> > > > >> data >>> > >> > > > >> > > > loss >>> > >> > > > >> > > > >> due >>> > >> > > > >> > > > >> > > to >>> > >> > > > >> > > > >> > > > > > > unclean >>> > >> > > > >> > > > >> > > > > > > >> > > leader election or correlated >>> > replica >>> > >> > > > failure. >>> > >> > > > >> > > > >> > > > > > > >> > > 3. It can reset offsets better >>> when >>> > >> > topic >>> > >> > > is >>> > >> > > > >> > > > recreated. >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > Thanks, >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > Jun >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, >>> > Dong >>> > >> > Lin < >>> > >> > > > >> > > > >> > > lindon...@gmail.com >>> > >> > > > >> > > > >> > > > > >>> > >> > > > >> > > > >> > > > > > > wrote: >>> > >> > > > >> > > > >> > > > > > > >> > > >>> > >> > > > >> > > > >> > > > > > > >> > > > Hey Jun, >>> > >> > > > >> > > > >> > > > > > > >> > > > >>> > >> > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we >>> > don't >>> > >> > want >>> > >> > > an >>> > >> > > > >> ever >>> > >> > > > >> > > > >> growing >>> > >> > > > >> > > > >> > > > global >>> > >> > > > >> > > > >> > > > > > > >> metadata >>> > >> > > > >> > > > >> > > > > > > >> > > > version. I just think it may >>> be >>> > >> more >>> > >> > > > >> desirable >>> > >> > > > >> > to >>> > >> > > > >> > > > >> keep >>> > >> > > > >> > > > >> > the >>> > >> > > > >> > > > >> > > > > > > consumer >>> > >> > > > >> > > > >> > > > > > > >> API >>> > >> > > > >> > > > >> > > > > > > >> > > > simple. >>> > >> > > > >> > > > >> > > > > > > >> > > > >>> > >> > > > >> > > > >> > > > > > > >> > > > In my current proposal, >>> metadata >>> > >> > version >>> > >> > > > >> > returned >>> > >> > > > >> > > > in >>> > >> > > > >> > > > >> the >>> > >> > > > >> > > > >> > > > fetch >>> > >> > > > >> > > > >> > > > > > > >> response >>> > >> > > > >> > > > >> > > > > > > >> > > > will be stored with the >>> offset >>> > >> > together. >>> > >> > > > >> More >>> > >> > > > >> > > > >> > > specifically, >>> > >> > > > >> > > > >> > > > > the >>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch in the new >>> offset >>> > >> topic >>> > >> > > > >> schema >>> > >> > > > >> > > will >>> > >> > > > >> > > > be >>> > >> > > > >> > > > >> > the >>> > >> > > > >> > > > >> > > > > > largest >>> > >> > > > >> > > > >> > > > > > > >> > > > metadata_epoch from all the >>> > >> > > > MetadataResponse >>> > >> > > > >> > and >>> > >> > > > >> > > > >> > > > FetchResponse >>> > >> > > > >> > > > >> > > > > > > ever >>> > >> > > > >> > > > >> > > > > > > >> > > > received by this consumer. >>> > >> > > > >> > > > >> > > > > > > >> > > > >>> > >> > > > >> > > > >> > > > > > > >> > > > We probably don't have to >>> change >>> > >> the >>> > >> > > > >> consumer >>> > >> > > > >> > API >>> > >> > > > >> > > > for >>> > >> > > > >> > > > >> > > > > > > >> > > > >>> commitSync(Map<TopicPartition, >>> > >> > > > >> > > OffsetAndMetadata>). >>> > >> > > > >> > > > >> If >>> > >> > > > >> > > > >> > > user >>> > >> > > > >> > > > >> > > > > > calls >>> > >> > > > >> > > > >> > > > > > > >> > > > commitSync(...) to commit >>> offset >>> > 10 >>> > >> > for >>> > >> > > a >>> > >> > > > >> given >>> > >> > > > >> > > > >> > partition, >>> > >> > > > >> > > > >> > > > for >>> > >> > > > >> > > > >> > > > > > > most >>> > >> > > > >> > > > >> > > > > > > >> > > > use-cases, this consumer >>> instance >>> > >> > should >>> > >> > > > >> have >>> > >> > > > >> > > > >> consumed >>> > >> > > > >> > > > >> > > > message >>> > >> > > > >> > > > >> > > > > > > with >>> > >> > > > >> > > > >> > > > > > > >> > > offset >>> > >> > > > >> > > > >> > > > > > > >> > > > 9 from this partition, in >>> which >>> > >> case >>> > >> > the >>> > >> > > > >> > consumer >>> > >> > > > >> > > > can >>> > >> > > > >> > > > >> > > > remember >>> > >> > > > >> > > > >> > > > > > and >>> > > >>> >>