Hey Jun, Thanks much. I like the the new API that you proposed. I am not sure what you exactly mean by offset_epoch. I suppose that we can use the pair of (partition_epoch, leader_epoch) as the offset_epoch, right?
Thanks, Dong On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > Got it. The api that you proposed works. The question is whether that's the > api that we want to have in the long term. My concern is that while the api > change is simple, the new api seems harder to explain and use. For example, > a consumer storing offsets externally now needs to call > waitForMetadataUpdate() after calling seek(). > > An alternative approach is to make the following compatible api changes in > Consumer. > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need to > change the CommitSync() api) > * Add a new api seek(TopicPartition partition, long offset, OffsetEpoch > offsetEpoch). We can potentially deprecate the old api seek(TopicPartition > partition, long offset) in the future. > > The alternative approach has similar amount of api changes as yours but has > the following benefits. > 1. The api works in a similar way as how offset management works now and is > probably what we want in the long term. > 2. It can reset offsets better when there is data loss due to unclean > leader election or correlated replica failure. > 3. It can reset offsets better when topic is recreated. > > Thanks, > > Jun > > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Yeah I agree that ideally we don't want an ever growing global metadata > > version. I just think it may be more desirable to keep the consumer API > > simple. > > > > In my current proposal, metadata version returned in the fetch response > > will be stored with the offset together. More specifically, the > > metadata_epoch in the new offset topic schema will be the largest > > metadata_epoch from all the MetadataResponse and FetchResponse ever > > received by this consumer. > > > > We probably don't have to change the consumer API for > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls > > commitSync(...) to commit offset 10 for a given partition, for most > > use-cases, this consumer instance should have consumed message with > offset > > 9 from this partition, in which case the consumer can remember and use > the > > metadata_epoch from the corresponding FetchResponse when committing > offset. > > If user calls commitSync(..) to commit offset 10 for a given partition > > without having consumed the message with offset 9 using this consumer > > instance, this is probably an advanced use-case. In this case the > advanced > > user can retrieve the metadata_epoch using the newly added > metadataEpoch() > > API after it fetches the message with offset 9 (probably from another > > consumer instance) and encode this metadata_epoch in the > > string OffsetAndMetadata.metadata. Do you think this solution would work? > > > > By "not sure that I fully understand your latest suggestion", are you > > referring to solution related to unclean leader election using > leader_epoch > > in my previous email? > > > > Thanks, > > Dong > > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > Not sure that I fully understand your latest suggestion. Returning an > > ever > > > growing global metadata version itself is no ideal, but is fine. My > > > question is whether the metadata version returned in the fetch response > > > needs to be stored with the offset together if offsets are stored > > > externally. If so, we also have to change the consumer API for > > commitSync() > > > and need to worry about compatibility. If we don't store the metadata > > > version together with the offset, on a consumer restart, it's not clear > > how > > > we can ensure the metadata in the consumer is high enough since there > is > > no > > > metadata version to compare with. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Jun, > > > > > > > > Thanks much for the explanation. > > > > > > > > I understand the advantage of partition_epoch over metadata_epoch. My > > > > current concern is that the use of leader_epoch and the > partition_epoch > > > > requires us considerable change to consumer's public API to take care > > of > > > > the case where user stores offset externally. For example, > *consumer*. > > > > *commitSync*(..) would have to take a map whose value is <offset, > > > metadata, > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would also > need > > > > leader_epoch and partition_epoch as parameter. Technically we can > > > probably > > > > still make it work in a backward compatible manner after careful > design > > > and > > > > discussion. But these changes can make the consumer's interface > > > > unnecessarily complex for more users who do not store offset > > externally. > > > > > > > > After thinking more about it, we can address all problems discussed > by > > > only > > > > using the metadata_epoch without introducing leader_epoch or the > > > > partition_epoch. The current KIP describes the changes to the > consumer > > > API > > > > and how the new API can be used if user stores offset externally. In > > > order > > > > to address the scenario you described earlier, we can include > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest. > > Consumer > > > > remembers the largest metadata_epoch from all the FetchResponse it > has > > > > received. The metadata_epoch committed with the offset, either within > > or > > > > outside Kafka, should be the largest metadata_epoch across all > > > > FetchResponse and MetadataResponse ever received by this consumer. > > > > > > > > The drawback of using only the metadata_epoch is that we can not > always > > > do > > > > the smart offset reset in case of unclean leader election which you > > > > mentioned earlier. But in most case, unclean leader election probably > > > > happens when consumer is not rebalancing/restarting. In these cases, > > > either > > > > consumer is not directly affected by unclean leader election since it > > is > > > > not consuming from the end of the log, or consumer can derive the > > > > leader_epoch from the most recent message received before it sees > > > > OffsetOutOfRangeException. So I am not sure it is worth adding the > > > > leader_epoch to consumer API to address the remaining corner case. > What > > > do > > > > you think? > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Dong, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > To solve the topic recreation issue, we could use either a global > > > > metadata > > > > > version or a partition level epoch. But either one will be a new > > > concept, > > > > > right? To me, the latter seems more natural. It also makes it > easier > > to > > > > > detect if a consumer's offset is still valid after a topic is > > > recreated. > > > > As > > > > > you pointed out, we don't need to store the partition epoch in the > > > > message. > > > > > The following is what I am thinking. When a partition is created, > we > > > can > > > > > assign a partition epoch from an ever-increasing global counter and > > > store > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. The > > > > > partition > > > > > epoch is propagated to every broker. The consumer will be tracking > a > > > > tuple > > > > > of <offset, leader epoch, partition epoch> for offsets. If a topic > is > > > > > recreated, it's possible that a consumer's offset and leader epoch > > > still > > > > > match that in the broker, but partition epoch won't be. In this > case, > > > we > > > > > can potentially still treat the consumer's offset as out of range > and > > > > reset > > > > > the offset based on the offset reset policy in the consumer. This > > seems > > > > > harder to do with a global metadata version. > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > This is a very good example. After thinking through this in > > detail, I > > > > > agree > > > > > > that we need to commit offset with leader epoch in order to > address > > > > this > > > > > > example. > > > > > > > > > > > > I think the remaining question is how to address the scenario > that > > > the > > > > > > topic is deleted and re-created. One possible solution is to > commit > > > > > offset > > > > > > with both the leader epoch and the metadata version. The logic > and > > > the > > > > > > implementation of this solution does not require a new concept > > (e.g. > > > > > > partition epoch) and it does not require any change to the > message > > > > format > > > > > > or leader epoch. It also allows us to order the metadata in a > > > > > > straightforward manner which may be useful in the future. So it > may > > > be > > > > a > > > > > > better solution than generating a random partition epoch every > time > > > we > > > > > > create a partition. Does this sound reasonable? > > > > > > > > > > > > Previously one concern with using the metadata version is that > > > consumer > > > > > > will be forced to refresh metadata even if metadata version is > > > > increased > > > > > > due to topics that the consumer is not interested in. Now I > > realized > > > > that > > > > > > this is probably not a problem. Currently client will refresh > > > metadata > > > > > > either due to InvalidMetadataException in the response from > broker > > or > > > > due > > > > > > to metadata expiry. The addition of the metadata version should > > > > increase > > > > > > the overhead of metadata refresh caused by > > InvalidMetadataException. > > > If > > > > > > client refresh metadata due to expiry and it receives a metadata > > > whose > > > > > > version is lower than the current metadata version, we can reject > > the > > > > > > metadata but still reset the metadata age, which essentially keep > > the > > > > > > existing behavior in the client. > > > > > > > > > > > > Thanks much, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > >