Hi, Dong, Yes, that's what I am thinking. OffsetEpoch will be composed of (partition_epoch, leader_epoch).
Thanks, Jun On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks much. I like the the new API that you proposed. I am not sure what > you exactly mean by offset_epoch. I suppose that we can use the pair of > (partition_epoch, leader_epoch) as the offset_epoch, right? > > Thanks, > Dong > > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Dong, > > > > Got it. The api that you proposed works. The question is whether that's > the > > api that we want to have in the long term. My concern is that while the > api > > change is simple, the new api seems harder to explain and use. For > example, > > a consumer storing offsets externally now needs to call > > waitForMetadataUpdate() after calling seek(). > > > > An alternative approach is to make the following compatible api changes > in > > Consumer. > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need to > > change the CommitSync() api) > > * Add a new api seek(TopicPartition partition, long offset, OffsetEpoch > > offsetEpoch). We can potentially deprecate the old api > seek(TopicPartition > > partition, long offset) in the future. > > > > The alternative approach has similar amount of api changes as yours but > has > > the following benefits. > > 1. The api works in a similar way as how offset management works now and > is > > probably what we want in the long term. > > 2. It can reset offsets better when there is data loss due to unclean > > leader election or correlated replica failure. > > 3. It can reset offsets better when topic is recreated. > > > > Thanks, > > > > Jun > > > > > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Jun, > > > > > > Yeah I agree that ideally we don't want an ever growing global metadata > > > version. I just think it may be more desirable to keep the consumer API > > > simple. > > > > > > In my current proposal, metadata version returned in the fetch response > > > will be stored with the offset together. More specifically, the > > > metadata_epoch in the new offset topic schema will be the largest > > > metadata_epoch from all the MetadataResponse and FetchResponse ever > > > received by this consumer. > > > > > > We probably don't have to change the consumer API for > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls > > > commitSync(...) to commit offset 10 for a given partition, for most > > > use-cases, this consumer instance should have consumed message with > > offset > > > 9 from this partition, in which case the consumer can remember and use > > the > > > metadata_epoch from the corresponding FetchResponse when committing > > offset. > > > If user calls commitSync(..) to commit offset 10 for a given partition > > > without having consumed the message with offset 9 using this consumer > > > instance, this is probably an advanced use-case. In this case the > > advanced > > > user can retrieve the metadata_epoch using the newly added > > metadataEpoch() > > > API after it fetches the message with offset 9 (probably from another > > > consumer instance) and encode this metadata_epoch in the > > > string OffsetAndMetadata.metadata. Do you think this solution would > work? > > > > > > By "not sure that I fully understand your latest suggestion", are you > > > referring to solution related to unclean leader election using > > leader_epoch > > > in my previous email? > > > > > > Thanks, > > > Dong > > > > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Dong, > > > > > > > > Not sure that I fully understand your latest suggestion. Returning an > > > ever > > > > growing global metadata version itself is no ideal, but is fine. My > > > > question is whether the metadata version returned in the fetch > response > > > > needs to be stored with the offset together if offsets are stored > > > > externally. If so, we also have to change the consumer API for > > > commitSync() > > > > and need to worry about compatibility. If we don't store the metadata > > > > version together with the offset, on a consumer restart, it's not > clear > > > how > > > > we can ensure the metadata in the consumer is high enough since there > > is > > > no > > > > metadata version to compare with. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jun, > > > > > > > > > > Thanks much for the explanation. > > > > > > > > > > I understand the advantage of partition_epoch over metadata_epoch. > My > > > > > current concern is that the use of leader_epoch and the > > partition_epoch > > > > > requires us considerable change to consumer's public API to take > care > > > of > > > > > the case where user stores offset externally. For example, > > *consumer*. > > > > > *commitSync*(..) would have to take a map whose value is <offset, > > > > metadata, > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would also > > need > > > > > leader_epoch and partition_epoch as parameter. Technically we can > > > > probably > > > > > still make it work in a backward compatible manner after careful > > design > > > > and > > > > > discussion. But these changes can make the consumer's interface > > > > > unnecessarily complex for more users who do not store offset > > > externally. > > > > > > > > > > After thinking more about it, we can address all problems discussed > > by > > > > only > > > > > using the metadata_epoch without introducing leader_epoch or the > > > > > partition_epoch. The current KIP describes the changes to the > > consumer > > > > API > > > > > and how the new API can be used if user stores offset externally. > In > > > > order > > > > > to address the scenario you described earlier, we can include > > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest. > > > Consumer > > > > > remembers the largest metadata_epoch from all the FetchResponse it > > has > > > > > received. The metadata_epoch committed with the offset, either > within > > > or > > > > > outside Kafka, should be the largest metadata_epoch across all > > > > > FetchResponse and MetadataResponse ever received by this consumer. > > > > > > > > > > The drawback of using only the metadata_epoch is that we can not > > always > > > > do > > > > > the smart offset reset in case of unclean leader election which you > > > > > mentioned earlier. But in most case, unclean leader election > probably > > > > > happens when consumer is not rebalancing/restarting. In these > cases, > > > > either > > > > > consumer is not directly affected by unclean leader election since > it > > > is > > > > > not consuming from the end of the log, or consumer can derive the > > > > > leader_epoch from the most recent message received before it sees > > > > > OffsetOutOfRangeException. So I am not sure it is worth adding the > > > > > leader_epoch to consumer API to address the remaining corner case. > > What > > > > do > > > > > you think? > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > To solve the topic recreation issue, we could use either a global > > > > > metadata > > > > > > version or a partition level epoch. But either one will be a new > > > > concept, > > > > > > right? To me, the latter seems more natural. It also makes it > > easier > > > to > > > > > > detect if a consumer's offset is still valid after a topic is > > > > recreated. > > > > > As > > > > > > you pointed out, we don't need to store the partition epoch in > the > > > > > message. > > > > > > The following is what I am thinking. When a partition is created, > > we > > > > can > > > > > > assign a partition epoch from an ever-increasing global counter > and > > > > store > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. > The > > > > > > partition > > > > > > epoch is propagated to every broker. The consumer will be > tracking > > a > > > > > tuple > > > > > > of <offset, leader epoch, partition epoch> for offsets. If a > topic > > is > > > > > > recreated, it's possible that a consumer's offset and leader > epoch > > > > still > > > > > > match that in the broker, but partition epoch won't be. In this > > case, > > > > we > > > > > > can potentially still treat the consumer's offset as out of range > > and > > > > > reset > > > > > > the offset based on the offset reset policy in the consumer. This > > > seems > > > > > > harder to do with a global metadata version. > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > This is a very good example. After thinking through this in > > > detail, I > > > > > > agree > > > > > > > that we need to commit offset with leader epoch in order to > > address > > > > > this > > > > > > > example. > > > > > > > > > > > > > > I think the remaining question is how to address the scenario > > that > > > > the > > > > > > > topic is deleted and re-created. One possible solution is to > > commit > > > > > > offset > > > > > > > with both the leader epoch and the metadata version. The logic > > and > > > > the > > > > > > > implementation of this solution does not require a new concept > > > (e.g. > > > > > > > partition epoch) and it does not require any change to the > > message > > > > > format > > > > > > > or leader epoch. It also allows us to order the metadata in a > > > > > > > straightforward manner which may be useful in the future. So it > > may > > > > be > > > > > a > > > > > > > better solution than generating a random partition epoch every > > time > > > > we > > > > > > > create a partition. Does this sound reasonable? > > > > > > > > > > > > > > Previously one concern with using the metadata version is that > > > > consumer > > > > > > > will be forced to refresh metadata even if metadata version is > > > > > increased > > > > > > > due to topics that the consumer is not interested in. Now I > > > realized > > > > > that > > > > > > > this is probably not a problem. Currently client will refresh > > > > metadata > > > > > > > either due to InvalidMetadataException in the response from > > broker > > > or > > > > > due > > > > > > > to metadata expiry. The addition of the metadata version should > > > > > increase > > > > > > > the overhead of metadata refresh caused by > > > InvalidMetadataException. > > > > If > > > > > > > client refresh metadata due to expiry and it receives a > metadata > > > > whose > > > > > > > version is lower than the current metadata version, we can > reject > > > the > > > > > > > metadata but still reset the metadata age, which essentially > keep > > > the > > > > > > > existing behavior in the client. > > > > > > > > > > > > > > Thanks much, > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > >