Regarding the use of the global epoch in 65), it is very similar to the proposal of the metadata_epoch we discussed earlier. The main difference is that this epoch is incremented when we create/expand/delete topic and does not change when controller re-send metadata.
I looked at our previous discussion. It seems that we prefer partition_epoch over the metadata_epoch because 1) we prefer not to have an ever growing metadata_epoch and 2) we can reset offset better when topic is re-created. The use of global topic_epoch avoids the drawback of an ever quickly ever growing metadata_epoch. Though the global epoch does not allow us to recognize the invalid offset committed before the topic re-creation, we can probably just delete the offset when we delete a topic. Thus I am not very sure whether it is still worthwhile to have a per-partition partition_epoch if the metadata already has the global epoch. On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks so much. These comments very useful. Please see below my comments. > > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Thanks for the updated KIP. A few more comments. >> >> 60. Perhaps having a partition epoch is more flexible since in the future, >> we may support deleting a partition as well. >> > > Yeah I have considered this. I think we can probably still support > deleting a partition by using the topic_epoch -- when partition of a topic > is deleted or created, epoch of all partitions of this topic will be > incremented by 1. Therefore, if that partition is re-created later, the > epoch of that partition will still be larger than its epoch before the > deletion, which still allows the client to order the metadata for the > purpose of this KIP. Does this sound reasonable? > > The advantage of using topic_epoch instead of partition_epoch is that the > size of the /brokers/topics/[topic] znode and request/response size can be > smaller. We have a limit on the maximum size of znode (typically 1MB). Use > partition epoch can effectively reduce the number of partitions that can be > described by the /brokers/topics/[topic] znode. > > One use-case of partition_epoch for client to detect that the committed > offset, either from kafka offset topic or from the external store is > invalid after partition deletion and re-creation. However, it seems that we > can also address this use-case with other approaches. For example, when > AdminClient deletes partitions, it can also delete the committed offsets > for those partitions from the offset topic. If user stores offset > externally, it might make sense for user to similarly remove offsets of > related partitions after these partitions are deleted. So I am not sure > that we should use partition_epoch in this KIP. > > >> >> 61. It seems that the leader epoch returned in the position() call should >> the the leader epoch returned in the fetch response, not the one in the >> metadata cache of the client. > > > I think this is a good idea. Just to double check, this change does not > affect the correctness or performance of this KIP. But it can be useful if > we want to use the leader_epoch to better handle the offset rest in case of > unclean leader election, which is listed in the future work. Is this > understanding correct? > > I have updated the KIP to specify that the leader_epoch returned by > position() should be the largest leader_epoch of those already consumed > messages whose offset < position. If no message has been consumed since > consumer initialization, the leader_epoch from seek() or > OffsetFetchResponse should be used. The offset included in the > OffsetCommitRequest will also be determined in the similar manner. > > >> >> 62. I am wondering if we should return the partition epoch in the fetch >> response as well. In the current proposal, if a topic is recreated and the >> new leader is on the same broker as the old one, there is nothing to force >> the metadata refresh in the client. So, the client may still associate the >> offset with the old partition epoch. >> > > Could you help me understand the problem if a client associates old > partition_epoch (or the topic_epoch as of the current KIP) with the offset? > The main purpose of the topic_epoch is to be able to drop leader_epoch to 0 > after a partition is deleted and re-created. I guess you may be thinking > about using the partition_epoch to detect that the committed offset is > invalid? In that case, I am wondering if the alternative approach described > in 60) would be reasonable. > > >> >> 63. There is some subtle coordination between the LeaderAndIsrRequest and >> UpdateMetadataRequest. Currently, when a leader changes, the controller >> first sends the LeaderAndIsrRequest to the assigned replicas and the >> UpdateMetadataRequest to every broker. So, there could be a small window >> when the leader already receives the new partition epoch in the >> LeaderAndIsrRequest, but the metadata cache in the broker hasn't been >> updated with the latest partition epoch. Not sure what's the best way to >> address this issue. Perhaps we can update the metadata cache on the broker >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge is >> that the two have slightly different data. For example, only the latter >> has >> all endpoints. >> > > I am not sure whether this is a problem. Could you explain a bit more what > specific problem this small window can cause? > > Since client can fetch metadata from any broker in the cluster, and given > that different brokers receive request (e.g. LeaderAndIsrRequest and > UpdateMetadataRequest) in arbitrary order, the metadata received by client > can be in arbitrary order (either newer or older) compared to the broker's > leadership state even if a given broker receives LeaderAndIsrRequest and > UpdateMetadataRequest simultaneously. So I am not sure it is useful to > update broker's cache with LeaderAndIsrRequest. > > >> 64. The enforcement of leader epoch in Offset commit: We allow a consumer >> to set an arbitrary offset. So it's possible for offsets or leader epoch >> to >> go backwards. I am not sure if we could always enforce that the leader >> epoch only goes up on the broker. >> > > Sure. I have removed this check from the KIP. > > BTW, we can probably still ensure that the leader_epoch always increase if > the leader_epoch used with offset commit is the max(leader_epoch of the > message with offset = the committed offset - 1, the largest known > leader_epoch from the metadata). But I don't have a good use-case for this > alternative definition. So I choose the keep the KIP simple by requiring > leader_epoch to always increase. > > >> 65. Good point on handling missing partition epoch due to topic deletion. >> Another potential way to address this is to additionally propagate the >> global partition epoch to brokers and the clients. This way, when a >> partition epoch is missing, we can use the global partition epoch to >> reason >> about which metadata is more recent. >> > > This is a great idea. The global epoch can be used to order the metadata > and help us recognize the more recent metadata if a topic (or partition) is > deleted and re-created. > > Actually, it seems we only need to propagate the global epoch to brokers > and clients without propagating this epoch on a per-topic or per-partition > basic. Doing so would simply interface changes made this KIP. Does this > approach sound reasonable? > > >> 66. A client may also get an offset by time using the offsetForTimes() >> api. >> So, we probably want to include offsetInternalMetadata in >> OffsetAndTimestamp >> as well. >> > > You are right. This probably also requires us to change the > ListOffsetRequest as well. I will update the KIP after we agree on the > solution for 65). > > >> >> 67. InteralMetadata can be a bit confusing with the metadata field already >> there. Perhaps we can just call it OffsetEpoch. It might be useful to make >> OffsetEpoch printable at least for debugging purpose. Once you do that, we >> are already exposing the internal fields. So, not sure if it's worth >> hiding >> them. If we do want to hide them, perhaps we can have sth like the >> following. The binary encoding is probably more efficient than JSON for >> external storage. >> >> OffsetEpoch { >> static OffsetEpoch decode(byte[]); >> >> public byte[] encode(); >> >> public String toString(); >> } >> > > Thanks much. I like this solution. I have updated the KIP accordingly. > > > >> >> Jun >> >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jason, >> > >> > Certainly. This sounds good. I have updated the KIP to clarity that the >> > global epoch will be incremented by 1 each time a topic is deleted. >> > >> > Thanks, >> > Dong >> > >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <ja...@confluent.io> >> > wrote: >> > >> > > Hi Dong, >> > > >> > > >> > > I think your approach will allow user to distinguish between the >> metadata >> > > > before and after the topic deletion. I also agree that this can be >> > > > potentially be useful to user. I am just not very sure whether we >> > already >> > > > have a good use-case to make the additional complexity worthwhile. >> It >> > > seems >> > > > that this feature is kind of independent of the main problem of this >> > KIP. >> > > > Could we add this as a future work? >> > > >> > > >> > > Do you think it's fair if we bump the topic epoch on deletion and >> leave >> > > propagation of the epoch for deleted topics for future work? I don't >> > think >> > > this adds much complexity and it makes the behavior consistent: every >> > topic >> > > mutation results in an epoch bump. >> > > >> > > Thanks, >> > > Jason >> > > >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> wrote: >> > > >> > > > Hey Ismael, >> > > > >> > > > I guess we actually need user to see this field so that user can >> store >> > > this >> > > > value in the external store together with the offset. We just prefer >> > the >> > > > value to be opaque to discourage most users from interpreting this >> > value. >> > > > One more advantage of using such an opaque field is to be able to >> > evolve >> > > > the information (or schema) of this value without changing consumer >> API >> > > in >> > > > the future. >> > > > >> > > > I also thinking it is probably OK for user to be able to interpret >> this >> > > > value, particularly for those advanced users. >> > > > >> > > > Thanks, >> > > > Dong >> > > > >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk> >> wrote: >> > > > >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson < >> ja...@confluent.io> >> > > > > wrote: >> > > > > > >> > > > > > class OffsetAndMetadata { >> > > > > > long offset; >> > > > > > byte[] offsetMetadata; >> > > > > > String metadata; >> > > > > > } >> > > > > >> > > > > >> > > > > > Admittedly, the naming is a bit annoying, but we can probably >> come >> > up >> > > > > with >> > > > > > something better. Internally the byte array would have a >> version. >> > If >> > > in >> > > > > the >> > > > > > future we have anything else we need to add, we can update the >> > > version >> > > > > and >> > > > > > we wouldn't need any new APIs. >> > > > > > >> > > > > >> > > > > We can also add fields to a class in a compatible way. So, it >> seems >> > to >> > > me >> > > > > that the main advantage of the byte array is that it's opaque to >> the >> > > > user. >> > > > > Is that correct? If so, we could also add any opaque metadata in a >> > > > subclass >> > > > > so that users don't even see it (unless they cast it, but then >> > they're >> > > on >> > > > > their own). >> > > > > >> > > > > Ismael >> > > > > >> > > > > The corresponding seek() and position() APIs might look something >> > like >> > > > > this: >> > > > > > >> > > > > > void seek(TopicPartition partition, long offset, byte[] >> > > > offsetMetadata); >> > > > > > byte[] positionMetadata(TopicPartition partition); >> > > > > > >> > > > > > What do you think? >> > > > > > >> > > > > > Thanks, >> > > > > > Jason >> > > > > > >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > > > >> > > > > > > Hey Jun, Jason, >> > > > > > > >> > > > > > > Thanks much for all the feedback. I have updated the KIP >> based on >> > > the >> > > > > > > latest discussion. Can you help check whether it looks good? >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Dong >> > > > > > > >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com >> > >> > > > wrote: >> > > > > > > >> > > > > > > > Hey Jun, >> > > > > > > > >> > > > > > > > Hmm... thinking about this more, I am not sure that the >> > proposed >> > > > API >> > > > > is >> > > > > > > > sufficient. For users that store offset externally, we >> probably >> > > > need >> > > > > > > extra >> > > > > > > > API to return the leader_epoch and partition_epoch for all >> > > > partitions >> > > > > > > that >> > > > > > > > consumers are consuming. I suppose these users currently use >> > > > > position() >> > > > > > > to >> > > > > > > > get the offset. Thus we probably need a new method >> > > > > > positionWithEpoch(..) >> > > > > > > to >> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does this >> sound >> > > > > > > reasonable? >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Dong >> > > > > > > > >> > > > > > > > >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> >> > > wrote: >> > > > > > > > >> > > > > > > >> Hi, Dong, >> > > > > > > >> >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be >> composed >> > of >> > > > > > > >> (partition_epoch, >> > > > > > > >> leader_epoch). >> > > > > > > >> >> > > > > > > >> Thanks, >> > > > > > > >> >> > > > > > > >> Jun >> > > > > > > >> >> > > > > > > >> >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin < >> lindon...@gmail.com >> > > >> > > > > wrote: >> > > > > > > >> >> > > > > > > >> > Hey Jun, >> > > > > > > >> > >> > > > > > > >> > Thanks much. I like the the new API that you proposed. I >> am >> > > not >> > > > > sure >> > > > > > > >> what >> > > > > > > >> > you exactly mean by offset_epoch. I suppose that we can >> use >> > > the >> > > > > pair >> > > > > > > of >> > > > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch, >> right? >> > > > > > > >> > >> > > > > > > >> > Thanks, >> > > > > > > >> > Dong >> > > > > > > >> > >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao < >> j...@confluent.io> >> > > > wrote: >> > > > > > > >> > >> > > > > > > >> > > Hi, Dong, >> > > > > > > >> > > >> > > > > > > >> > > Got it. The api that you proposed works. The question >> is >> > > > whether >> > > > > > > >> that's >> > > > > > > >> > the >> > > > > > > >> > > api that we want to have in the long term. My concern >> is >> > > that >> > > > > > while >> > > > > > > >> the >> > > > > > > >> > api >> > > > > > > >> > > change is simple, the new api seems harder to explain >> and >> > > use. >> > > > > For >> > > > > > > >> > example, >> > > > > > > >> > > a consumer storing offsets externally now needs to call >> > > > > > > >> > > waitForMetadataUpdate() after calling seek(). >> > > > > > > >> > > >> > > > > > > >> > > An alternative approach is to make the following >> > compatible >> > > > api >> > > > > > > >> changes >> > > > > > > >> > in >> > > > > > > >> > > Consumer. >> > > > > > > >> > > * Add an additional OffsetEpoch field in >> > OffsetAndMetadata. >> > > > (no >> > > > > > need >> > > > > > > >> to >> > > > > > > >> > > change the CommitSync() api) >> > > > > > > >> > > * Add a new api seek(TopicPartition partition, long >> > offset, >> > > > > > > >> OffsetEpoch >> > > > > > > >> > > offsetEpoch). We can potentially deprecate the old api >> > > > > > > >> > seek(TopicPartition >> > > > > > > >> > > partition, long offset) in the future. >> > > > > > > >> > > >> > > > > > > >> > > The alternative approach has similar amount of api >> changes >> > > as >> > > > > > yours >> > > > > > > >> but >> > > > > > > >> > has >> > > > > > > >> > > the following benefits. >> > > > > > > >> > > 1. The api works in a similar way as how offset >> management >> > > > works >> > > > > > now >> > > > > > > >> and >> > > > > > > >> > is >> > > > > > > >> > > probably what we want in the long term. >> > > > > > > >> > > 2. It can reset offsets better when there is data loss >> due >> > > to >> > > > > > > unclean >> > > > > > > >> > > leader election or correlated replica failure. >> > > > > > > >> > > 3. It can reset offsets better when topic is recreated. >> > > > > > > >> > > >> > > > > > > >> > > Thanks, >> > > > > > > >> > > >> > > > > > > >> > > Jun >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin < >> > > lindon...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > >> > > >> > > > > > > >> > > > Hey Jun, >> > > > > > > >> > > > >> > > > > > > >> > > > Yeah I agree that ideally we don't want an ever >> growing >> > > > global >> > > > > > > >> metadata >> > > > > > > >> > > > version. I just think it may be more desirable to >> keep >> > the >> > > > > > > consumer >> > > > > > > >> API >> > > > > > > >> > > > simple. >> > > > > > > >> > > > >> > > > > > > >> > > > In my current proposal, metadata version returned in >> the >> > > > fetch >> > > > > > > >> response >> > > > > > > >> > > > will be stored with the offset together. More >> > > specifically, >> > > > > the >> > > > > > > >> > > > metadata_epoch in the new offset topic schema will be >> > the >> > > > > > largest >> > > > > > > >> > > > metadata_epoch from all the MetadataResponse and >> > > > FetchResponse >> > > > > > > ever >> > > > > > > >> > > > received by this consumer. >> > > > > > > >> > > > >> > > > > > > >> > > > We probably don't have to change the consumer API for >> > > > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). >> If >> > > user >> > > > > > calls >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a given >> > partition, >> > > > for >> > > > > > > most >> > > > > > > >> > > > use-cases, this consumer instance should have >> consumed >> > > > message >> > > > > > > with >> > > > > > > >> > > offset >> > > > > > > >> > > > 9 from this partition, in which case the consumer can >> > > > remember >> > > > > > and >> > > > > > > >> use >> > > > > > > >> > > the >> > > > > > > >> > > > metadata_epoch from the corresponding FetchResponse >> when >> > > > > > > committing >> > > > > > > >> > > offset. >> > > > > > > >> > > > If user calls commitSync(..) to commit offset 10 for >> a >> > > given >> > > > > > > >> partition >> > > > > > > >> > > > without having consumed the message with offset 9 >> using >> > > this >> > > > > > > >> consumer >> > > > > > > >> > > > instance, this is probably an advanced use-case. In >> this >> > > > case >> > > > > > the >> > > > > > > >> > > advanced >> > > > > > > >> > > > user can retrieve the metadata_epoch using the newly >> > added >> > > > > > > >> > > metadataEpoch() >> > > > > > > >> > > > API after it fetches the message with offset 9 >> (probably >> > > > from >> > > > > > > >> another >> > > > > > > >> > > > consumer instance) and encode this metadata_epoch in >> the >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you think this >> > > > solution >> > > > > > > would >> > > > > > > >> > work? >> > > > > > > >> > > > >> > > > > > > >> > > > By "not sure that I fully understand your latest >> > > > suggestion", >> > > > > > are >> > > > > > > >> you >> > > > > > > >> > > > referring to solution related to unclean leader >> election >> > > > using >> > > > > > > >> > > leader_epoch >> > > > > > > >> > > > in my previous email? >> > > > > > > >> > > > >> > > > > > > >> > > > Thanks, >> > > > > > > >> > > > Dong >> > > > > > > >> > > > >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao < >> > j...@confluent.io >> > > > >> > > > > > wrote: >> > > > > > > >> > > > >> > > > > > > >> > > > > Hi, Dong, >> > > > > > > >> > > > > >> > > > > > > >> > > > > Not sure that I fully understand your latest >> > suggestion. >> > > > > > > >> Returning an >> > > > > > > >> > > > ever >> > > > > > > >> > > > > growing global metadata version itself is no ideal, >> > but >> > > is >> > > > > > fine. >> > > > > > > >> My >> > > > > > > >> > > > > question is whether the metadata version returned >> in >> > the >> > > > > fetch >> > > > > > > >> > response >> > > > > > > >> > > > > needs to be stored with the offset together if >> offsets >> > > are >> > > > > > > stored >> > > > > > > >> > > > > externally. If so, we also have to change the >> consumer >> > > API >> > > > > for >> > > > > > > >> > > > commitSync() >> > > > > > > >> > > > > and need to worry about compatibility. If we don't >> > store >> > > > the >> > > > > > > >> metadata >> > > > > > > >> > > > > version together with the offset, on a consumer >> > restart, >> > > > > it's >> > > > > > > not >> > > > > > > >> > clear >> > > > > > > >> > > > how >> > > > > > > >> > > > > we can ensure the metadata in the consumer is high >> > > enough >> > > > > > since >> > > > > > > >> there >> > > > > > > >> > > is >> > > > > > > >> > > > no >> > > > > > > >> > > > > metadata version to compare with. >> > > > > > > >> > > > > >> > > > > > > >> > > > > Thanks, >> > > > > > > >> > > > > >> > > > > > > >> > > > > Jun >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin < >> > > > > lindon...@gmail.com >> > > > > > > >> > > > > > > >> > wrote: >> > > > > > > >> > > > > >> > > > > > > >> > > > > > Hey Jun, >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Thanks much for the explanation. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > I understand the advantage of partition_epoch >> over >> > > > > > > >> metadata_epoch. >> > > > > > > >> > My >> > > > > > > >> > > > > > current concern is that the use of leader_epoch >> and >> > > the >> > > > > > > >> > > partition_epoch >> > > > > > > >> > > > > > requires us considerable change to consumer's >> public >> > > API >> > > > > to >> > > > > > > take >> > > > > > > >> > care >> > > > > > > >> > > > of >> > > > > > > >> > > > > > the case where user stores offset externally. For >> > > > example, >> > > > > > > >> > > *consumer*. >> > > > > > > >> > > > > > *commitSync*(..) would have to take a map whose >> > value >> > > is >> > > > > > > >> <offset, >> > > > > > > >> > > > > metadata, >> > > > > > > >> > > > > > leader epoch, partition epoch>. >> > *consumer*.*seek*(...) >> > > > > would >> > > > > > > >> also >> > > > > > > >> > > need >> > > > > > > >> > > > > > leader_epoch and partition_epoch as parameter. >> > > > Technically >> > > > > > we >> > > > > > > >> can >> > > > > > > >> > > > > probably >> > > > > > > >> > > > > > still make it work in a backward compatible >> manner >> > > after >> > > > > > > careful >> > > > > > > >> > > design >> > > > > > > >> > > > > and >> > > > > > > >> > > > > > discussion. But these changes can make the >> > consumer's >> > > > > > > interface >> > > > > > > >> > > > > > unnecessarily complex for more users who do not >> > store >> > > > > offset >> > > > > > > >> > > > externally. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > After thinking more about it, we can address all >> > > > problems >> > > > > > > >> discussed >> > > > > > > >> > > by >> > > > > > > >> > > > > only >> > > > > > > >> > > > > > using the metadata_epoch without introducing >> > > > leader_epoch >> > > > > or >> > > > > > > the >> > > > > > > >> > > > > > partition_epoch. The current KIP describes the >> > changes >> > > > to >> > > > > > the >> > > > > > > >> > > consumer >> > > > > > > >> > > > > API >> > > > > > > >> > > > > > and how the new API can be used if user stores >> > offset >> > > > > > > >> externally. >> > > > > > > >> > In >> > > > > > > >> > > > > order >> > > > > > > >> > > > > > to address the scenario you described earlier, we >> > can >> > > > > > include >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and the >> > > > > > > LeaderAndIsrRequest. >> > > > > > > >> > > > Consumer >> > > > > > > >> > > > > > remembers the largest metadata_epoch from all the >> > > > > > > FetchResponse >> > > > > > > >> it >> > > > > > > >> > > has >> > > > > > > >> > > > > > received. The metadata_epoch committed with the >> > > offset, >> > > > > > either >> > > > > > > >> > within >> > > > > > > >> > > > or >> > > > > > > >> > > > > > outside Kafka, should be the largest >> metadata_epoch >> > > > across >> > > > > > all >> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever received >> by >> > > this >> > > > > > > >> consumer. >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > The drawback of using only the metadata_epoch is >> > that >> > > we >> > > > > can >> > > > > > > not >> > > > > > > >> > > always >> > > > > > > >> > > > > do >> > > > > > > >> > > > > > the smart offset reset in case of unclean leader >> > > > election >> > > > > > > which >> > > > > > > >> you >> > > > > > > >> > > > > > mentioned earlier. But in most case, unclean >> leader >> > > > > election >> > > > > > > >> > probably >> > > > > > > >> > > > > > happens when consumer is not >> rebalancing/restarting. >> > > In >> > > > > > these >> > > > > > > >> > cases, >> > > > > > > >> > > > > either >> > > > > > > >> > > > > > consumer is not directly affected by unclean >> leader >> > > > > election >> > > > > > > >> since >> > > > > > > >> > it >> > > > > > > >> > > > is >> > > > > > > >> > > > > > not consuming from the end of the log, or >> consumer >> > can >> > > > > > derive >> > > > > > > >> the >> > > > > > > >> > > > > > leader_epoch from the most recent message >> received >> > > > before >> > > > > it >> > > > > > > >> sees >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is >> > > worth >> > > > > > adding >> > > > > > > >> the >> > > > > > > >> > > > > > leader_epoch to consumer API to address the >> > remaining >> > > > > corner >> > > > > > > >> case. >> > > > > > > >> > > What >> > > > > > > >> > > > > do >> > > > > > > >> > > > > > you think? >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > Thanks, >> > > > > > > >> > > > > > Dong >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao < >> > > > j...@confluent.io >> > > > > > >> > > > > > > >> wrote: >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > Hi, Dong, >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Thanks for the reply. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > To solve the topic recreation issue, we could >> use >> > > > > either a >> > > > > > > >> global >> > > > > > > >> > > > > > metadata >> > > > > > > >> > > > > > > version or a partition level epoch. But either >> one >> > > > will >> > > > > > be a >> > > > > > > >> new >> > > > > > > >> > > > > concept, >> > > > > > > >> > > > > > > right? To me, the latter seems more natural. It >> > also >> > > > > makes >> > > > > > > it >> > > > > > > >> > > easier >> > > > > > > >> > > > to >> > > > > > > >> > > > > > > detect if a consumer's offset is still valid >> > after a >> > > > > topic >> > > > > > > is >> > > > > > > >> > > > > recreated. >> > > > > > > >> > > > > > As >> > > > > > > >> > > > > > > you pointed out, we don't need to store the >> > > partition >> > > > > > epoch >> > > > > > > in >> > > > > > > >> > the >> > > > > > > >> > > > > > message. >> > > > > > > >> > > > > > > The following is what I am thinking. When a >> > > partition >> > > > is >> > > > > > > >> created, >> > > > > > > >> > > we >> > > > > > > >> > > > > can >> > > > > > > >> > > > > > > assign a partition epoch from an >> ever-increasing >> > > > global >> > > > > > > >> counter >> > > > > > > >> > and >> > > > > > > >> > > > > store >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/ >> > > > partitions/[partitionId] >> > > > > in >> > > > > > > ZK. >> > > > > > > >> > The >> > > > > > > >> > > > > > > partition >> > > > > > > >> > > > > > > epoch is propagated to every broker. The >> consumer >> > > will >> > > > > be >> > > > > > > >> > tracking >> > > > > > > >> > > a >> > > > > > > >> > > > > > tuple >> > > > > > > >> > > > > > > of <offset, leader epoch, partition epoch> for >> > > > offsets. >> > > > > > If a >> > > > > > > >> > topic >> > > > > > > >> > > is >> > > > > > > >> > > > > > > recreated, it's possible that a consumer's >> offset >> > > and >> > > > > > leader >> > > > > > > >> > epoch >> > > > > > > >> > > > > still >> > > > > > > >> > > > > > > match that in the broker, but partition epoch >> > won't >> > > > be. >> > > > > In >> > > > > > > >> this >> > > > > > > >> > > case, >> > > > > > > >> > > > > we >> > > > > > > >> > > > > > > can potentially still treat the consumer's >> offset >> > as >> > > > out >> > > > > > of >> > > > > > > >> range >> > > > > > > >> > > and >> > > > > > > >> > > > > > reset >> > > > > > > >> > > > > > > the offset based on the offset reset policy in >> the >> > > > > > consumer. >> > > > > > > >> This >> > > > > > > >> > > > seems >> > > > > > > >> > > > > > > harder to do with a global metadata version. >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin < >> > > > > > > >> lindon...@gmail.com> >> > > > > > > >> > > > wrote: >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > Hey Jun, >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > This is a very good example. After thinking >> > > through >> > > > > this >> > > > > > > in >> > > > > > > >> > > > detail, I >> > > > > > > >> > > > > > > agree >> > > > > > > >> > > > > > > > that we need to commit offset with leader >> epoch >> > in >> > > > > order >> > > > > > > to >> > > > > > > >> > > address >> > > > > > > >> > > > > > this >> > > > > > > >> > > > > > > > example. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > I think the remaining question is how to >> address >> > > the >> > > > > > > >> scenario >> > > > > > > >> > > that >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > topic is deleted and re-created. One possible >> > > > solution >> > > > > > is >> > > > > > > to >> > > > > > > >> > > commit >> > > > > > > >> > > > > > > offset >> > > > > > > >> > > > > > > > with both the leader epoch and the metadata >> > > version. >> > > > > The >> > > > > > > >> logic >> > > > > > > >> > > and >> > > > > > > >> > > > > the >> > > > > > > >> > > > > > > > implementation of this solution does not >> > require a >> > > > new >> > > > > > > >> concept >> > > > > > > >> > > > (e.g. >> > > > > > > >> > > > > > > > partition epoch) and it does not require any >> > > change >> > > > to >> > > > > > the >> > > > > > > >> > > message >> > > > > > > >> > > > > > format >> > > > > > > >> > > > > > > > or leader epoch. It also allows us to order >> the >> > > > > metadata >> > > > > > > in >> > > > > > > >> a >> > > > > > > >> > > > > > > > straightforward manner which may be useful in >> > the >> > > > > > future. >> > > > > > > >> So it >> > > > > > > >> > > may >> > > > > > > >> > > > > be >> > > > > > > >> > > > > > a >> > > > > > > >> > > > > > > > better solution than generating a random >> > partition >> > > > > epoch >> > > > > > > >> every >> > > > > > > >> > > time >> > > > > > > >> > > > > we >> > > > > > > >> > > > > > > > create a partition. Does this sound >> reasonable? >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > Previously one concern with using the >> metadata >> > > > version >> > > > > > is >> > > > > > > >> that >> > > > > > > >> > > > > consumer >> > > > > > > >> > > > > > > > will be forced to refresh metadata even if >> > > metadata >> > > > > > > version >> > > > > > > >> is >> > > > > > > >> > > > > > increased >> > > > > > > >> > > > > > > > due to topics that the consumer is not >> > interested >> > > > in. >> > > > > > Now >> > > > > > > I >> > > > > > > >> > > > realized >> > > > > > > >> > > > > > that >> > > > > > > >> > > > > > > > this is probably not a problem. Currently >> client >> > > > will >> > > > > > > >> refresh >> > > > > > > >> > > > > metadata >> > > > > > > >> > > > > > > > either due to InvalidMetadataException in the >> > > > response >> > > > > > > from >> > > > > > > >> > > broker >> > > > > > > >> > > > or >> > > > > > > >> > > > > > due >> > > > > > > >> > > > > > > > to metadata expiry. The addition of the >> metadata >> > > > > version >> > > > > > > >> should >> > > > > > > >> > > > > > increase >> > > > > > > >> > > > > > > > the overhead of metadata refresh caused by >> > > > > > > >> > > > InvalidMetadataException. >> > > > > > > >> > > > > If >> > > > > > > >> > > > > > > > client refresh metadata due to expiry and it >> > > > receives >> > > > > a >> > > > > > > >> > metadata >> > > > > > > >> > > > > whose >> > > > > > > >> > > > > > > > version is lower than the current metadata >> > > version, >> > > > we >> > > > > > can >> > > > > > > >> > reject >> > > > > > > >> > > > the >> > > > > > > >> > > > > > > > metadata but still reset the metadata age, >> which >> > > > > > > essentially >> > > > > > > >> > keep >> > > > > > > >> > > > the >> > > > > > > >> > > > > > > > existing behavior in the client. >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > Thanks much, >> > > > > > > >> > > > > > > > Dong >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > > >> > > > > > > >> > >> > > > > > > >> >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >