Hi Dong, Thanks for the follow up! I finally have much more clear understanding of where you are coming from.
You are right. The success of findOffsets()/finding a point of non-divergence depends on whether we have enough entries in the consumer's leader epoch cache. However, I think this is a fundamental limitation of having a leader epoch cache that does not persist across consumer restarts. If we consider the general case where consumer may or may not have this cache, then I see two paths: 1) Letting the user to track the leader epoch history externally, and have more exposure to leader epoch and finding point of non-divergence in KafkaConsumer API. I understand this is the case you were talking about. On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com> wrote: > Hey Anna, > > Thanks much for your detailed explanation and example! It does help me > understand the difference between our understanding. > > So it seems that the solution based on findOffsets() currently focuses > mainly on the scenario that consumer has cached leaderEpoch -> offset > mapping whereas I was thinking about the general case where consumer may or > may not have this cache. I guess that is why we have different > understanding here. I have some comments below. > > > 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by > seek(offset) works if consumer has the cached leaderEpoch -> offset > mapping. But if we assume consumer has this cache, do we need to have > leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset) > can also derive the leaderEpoch using offset just like the proposed > solution does with seek(offset). > > > 4) If consumer does not have cached leaderEpoch -> offset mapping, which is > the case if consumer is restarted on a new machine, then it is not clear > what leaderEpoch would be included in the FetchRequest if consumer does > seek(offset). This is the case that motivates the first question of the > previous email. In general, maybe we should discuss the final solution that > covers all cases? > > > 5) The second question in my previous email is related to the following > paragraph: > > "... In some cases, offsets returned from position() could be actual > consumed messages by this consumer identified by {offset, leader epoch}. In > other cases, position() returns offset that was not actually consumed. > Suppose, the user calls position() for the last offset...". > > I guess my point is that, if user calls position() for the last offset and > uses that offset in seek(...), then user can probably just call > Consumer#seekToEnd() without calling position() and seek(...). Similarly > user can call Consumer#seekToBeginning() to the seek to the earliest > position without calling position() and seek(...). Thus position() only > needs to return the actual consumed messages identified by {offset, leader > epoch}. Does this make sense? > > > Thanks, > Dong > > > On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <a...@confluent.io> wrote: > > > Hi Dong, > > > > > > Thanks for considering my suggestions. > > > > > > Based on your comments, I realized that my suggestion was not complete > with > > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose > > to keep KafkaConsumer#seek() unchanged and take offset only, the > underlying > > consumer will send the next FetchRequest() to broker with offset and > > leaderEpoch if it is known (based on leader epoch cache in consumer) — > note > > that this is different from the current KIP, which suggests to always > send > > unknown leader epoch after seek(). This way, if the consumer and a broker > > agreed on the point of non-divergence, which is some {offset, > leaderEpoch} > > pair, the new leader which causes another truncation (even further back) > > will be able to detect new divergence and restart the process of finding > > the new point of non-divergence. So, to answer your question, If the > > truncation happens just after the user calls > > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset), > > the user will not seek to the wrong position without knowing that > > truncation has happened, because the consumer will get another truncation > > error, and seek again. > > > > > > I am afraid, I did not understand your second question. Let me summarize > my > > suggestions again, and then give an example to hopefully make my > > suggestions more clear. Also, the last part of my example shows how the > > use-case in your first question will work. If it does not answer your > > second question, would you mind clarifying? I am also focusing on the > case > > of a consumer having enough entries in the cache. The case of restarting > > from committed offset either stored externally or internally will > probably > > need to be discussed more. > > > > > > Let me summarize my suggestion again: > > > > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains unchanged > > > > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair per > > topic partition and returns offset per topic partition. > > > > 3) FetchRequest() to broker after KafkaConsumer#seek() will contain the > > offset set by seek and leaderEpoch that corresponds to the offset based > on > > leader epoch cache in the consumer. > > > > > > The rest of this e-mail is a long and contrived example with several log > > truncations and unclean leader elections to illustrate the API and your > > first use-case. Suppose we have three brokers. Initially, Broker A, B, > and > > C has one message at offset 0 with leader epoch 0. Then, Broker A goes > down > > for some time. Broker B becomes a leader with epoch 1, and writes > messages > > to offsets 1 and 2. Broker C fetches offset 1, but before fetching offset > > 2, becomes a leader with leader epoch 2 and writes a message at offset 2. > > Here is the state of brokers at this point: > > > > > Broker A: > > > offset 0, epoch 0 <— leader > > > goes down… > > > > > > > Broker B: > > > offset 0, epoch 0 > > > offset 1, epoch 1 <- leader > > > offset 2, epoch 1 > > > > > > > > Broker C: > > > offset 0, epoch 0 > > > offset 1, epoch 1 > > > offset 2, epoch 2 <— leader > > > > > > Before Broker C becomes a leader with leader epoch 2, the consumer > consumed > > the following messages from broker A and broker B: > > > > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2, > > leaderEpoch=1}. > > > > Consumer’s leader epoch cache at this point contains the following > entries: > > > > (leaderEpoch=0, startOffset=0) > > > > (leaderEpoch=1, startOffset=1) > > > > endOffset = 3 > > > > > > Then, broker B becomes the follower of broker C, truncates and starts > > fetching from offset 2. > > > > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets > > LOG_TRUNCATION > > error from broker C. > > > > In response, the client calls KafkaConsumer#findOffsets(offset=3, > > leaderEpoch=1). The underlying consumer sends > > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with > > {leaderEpoch=1, endOffset=2}. So, KafkaConsumer#findOffsets(offset=3, > > leaderEpoch=1) returns offset=2. > > > > In response, consumer calls KafkaConsumer@seek(offset=2) followed by > > poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to broker > C. > > > > > > I will continue with this example with the goal to answer your first > > question about truncation just after findOffsets() followed by seek(): > > > > Suppose, brokers B and C go down, and broker A comes up and becomes a > > leader with leader epoch 3, and writes a message to offset 1. Suppose, > this > > happens before the consumer gets response from broker C to the previous > > fetch request: FetchRequest(offset=2, leaderEpoch=1). > > > > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A, > which > > returns LOG_TRUNCATION error, because broker A has leader epoch 3 > > leader > > epoch in FetchRequest with starting offset = 1 < offset 2 in > > FetchRequest(). > > > > In response, the user calls KafkaConsumer#findOffsets(offset=2, > > leaderEpoch=1). The underlying consumer sends > > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with > > {leaderEpoch=0, endOffset=1}; the underlying consumer finds leaderEpoch > = 0 > > in its cache with end offset == 1, which results in > > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset = 1. > > > > In response, the user calls KafkaConsumer@seek(offset=1) followed by > > poll(), which results in FetchRequest(offset=1, leaderEpoch=0) to broker > A, > > which responds with message at offset 1, leader epoch 3. > > > > > > I will think some more about consumers restarting from committed offsets, > > and send a follow up. > > > > > > Thanks, > > > > Anna > > > > > > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Anna, > > > > > > Thanks much for the thoughtful reply. It makes sense to different > between > > > "seeking to a message" and "seeking to a position". I have to questions > > > here: > > > > > > - For "seeking to a message" use-case, with the proposed approach user > > > needs to call findOffset(offset, leaderEpoch) followed by seek(offset). > > If > > > message truncation and message append happen immediately after > > > findOffset(offset, > > > leaderEpoch) but before seek(offset), it seems that user will seek to > the > > > wrong message without knowing the truncation has happened. Would this > be > > a > > > problem? > > > > > > - For "seeking to a position" use-case, it seems that there can be two > > > positions, i.e. earliest and latest. So these two cases can be > > > Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then > it > > > seems that user will only need to call position() and seek() for > "seeking > > > to a message" use-case? > > > > > > Thanks, > > > Dong > > > > > > > > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <a...@confluent.io> > wrote: > > > > > > > Hi Jason and Dong, > > > > > > > > > > > > I’ve been thinking about your suggestions and discussion regarding > > > > position(), seek(), and new proposed API. > > > > > > > > > > > > Here is my thought process why we should keep position() and seek() > API > > > > unchanged. > > > > > > > > > > > > I think we should separate {offset, leader epoch} that uniquely > > > identifies > > > > a message from an offset that is a position. In some cases, offsets > > > > returned from position() could be actual consumed messages by this > > > consumer > > > > identified by {offset, leader epoch}. In other cases, position() > > returns > > > > offset that was not actually consumed. Suppose, the user calls > > position() > > > > for the last offset. Suppose we return {offset, leader epoch} of the > > > > message currently in the log. Then, the message gets truncated before > > > > consumer’s first poll(). It does not make sense for poll() to fail in > > > this > > > > case, because the log truncation did not actually happen from the > > > consumer > > > > perspective. On the other hand, as the KIP proposes, it makes sense > for > > > the > > > > committed() method to return {offset, leader epoch} because those > > offsets > > > > represent actual consumed messages. > > > > > > > > > > > > The same argument applies to the seek() method — we are not seeking > to > > a > > > > message, we are seeking to a position. > > > > > > > > > > > > I like the proposal to add KafkaConsumer#findOffsets() API. I am > > assuming > > > > something like: > > > > > > > > Map<TopicPartition, Long> findOffsets(Map<TopicPartition, > > OffsetAndEpoch> > > > > offsetsToSearch) > > > > > > > > Similar to seek() and position(), I think findOffsets() should return > > > > offset without leader epoch, because what we want is the offset that > we > > > > think is closest to the not divergent message from the given consumed > > > > message. Until the consumer actually fetches the message, we should > not > > > let > > > > the consumer store the leader epoch for a message it did not consume. > > > > > > > > > > > > So, the workflow will be: > > > > > > > > 1) The user gets LogTruncationException with {offset, leader epoch of > > the > > > > previous message} (whatever we send with new FetchRecords request). > > > > > > > > 2) offset = findOffsets(tp -> {offset, leader epoch}) > > > > > > > > 3) seek(offset) > > > > > > > > > > > > For the use-case where the users store committed offsets externally: > > > > > > > > 1) Such users would have to track the leader epoch together with an > > > offset. > > > > Otherwise, there is no way to detect later what leader epoch was > > > associated > > > > with the message. I think it’s reasonable to ask that from users if > > they > > > > want to detect log truncation. Otherwise, they will get the current > > > > behavior. > > > > > > > > > > > > If the users currently get an offset to be stored using position(), I > > see > > > > two possibilities. First, they call save offset returned from > > position() > > > > that they call before poll(). In that case, it would not be correct > to > > > > store {offset, leader epoch} if we would have changed position() to > > > return > > > > {offset, leader epoch} since actual fetched message could be > different > > > > (from the example I described earlier). So, it would be more correct > to > > > > call position() after poll(). However, the user already gets > > > > ConsumerRecords at this point, from which the user can extract > {offset, > > > > leader epoch} of the last message. > > > > > > > > > > > > So, I like the idea of adding a helper method to ConsumerRecords, as > > > Jason > > > > proposed, something like: > > > > > > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where > OffsetAndEpoch > > > is > > > > a data struct holding {offset, leader epoch}. > > > > > > > > > > > > In this case, we would advise the user to follow the workflow: > poll(), > > > get > > > > {offset, leader epoch} from ConsumerRecords#lastOffsetWith > > LeaderEpoch(), > > > > save offset and leader epoch, process records. > > > > > > > > > > > > 2) When the user needs to seek to the last committed offset, they > call > > > new > > > > findOffsets(saved offset, leader epoch), and then seek(offset). > > > > > > > > > > > > What do you think? > > > > > > > > > > > > Thanks, > > > > > > > > Anna > > > > > > > > > > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > > Hey Jason, > > > > > > > > > > Thanks much for your thoughtful explanation. > > > > > > > > > > Yes the solution using findOffsets(offset, leaderEpoch) also works. > > The > > > > > advantage of this solution it adds only one API instead of two > APIs. > > > The > > > > > concern is that its usage seems a bit more clumsy for advanced > users. > > > > More > > > > > specifically, advanced users who store offsets externally will > always > > > > need > > > > > to call findOffsets() before calling seek(offset) during consumer > > > > > initialization. And those advanced users will need to manually keep > > > track > > > > > of the leaderEpoch of the last ConsumerRecord. > > > > > > > > > > The other solution may be more user-friendly for advanced users is > to > > > add > > > > > two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) = > > > > > offsetEpochs(topicPartition)`. > > > > > > > > > > I kind of prefer the second solution because it is easier to use > for > > > > > advanced users. If we need to expose leaderEpoch anyway to safely > > > > identify > > > > > a message, it may be conceptually simpler to expose it directly in > > > > > seek(...) rather than requiring one more translation using > > > > > findOffsets(...). But I am also OK with the first solution if other > > > > > developers also favor that one :) > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson < > ja...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > Thanks, I've been thinking about your suggestions a bit. It is > > > > > challenging > > > > > > to make this work given the current APIs. One of the difficulties > > is > > > > that > > > > > > we don't have an API to find the leader epoch for a given offset > at > > > the > > > > > > moment. So if the user does a seek to offset 5, then we'll need a > > new > > > > API > > > > > > to find the corresponding epoch in order to fulfill the new > > > position() > > > > > API. > > > > > > Potentially we could modify ListOffsets to enable finding the > > leader > > > > > epoch, > > > > > > but I am not sure it is worthwhile. Perhaps it is reasonable for > > > > advanced > > > > > > usage to expect that the epoch information, if needed, will be > > > > extracted > > > > > > from the records directly? It might make sense to expose a helper > > in > > > > > > `ConsumerRecords` to make this a little easier though. > > > > > > > > > > > > Alternatively, if we think it is important to have this > information > > > > > exposed > > > > > > directly, we could create batch APIs to solve the naming problem. > > For > > > > > > example: > > > > > > > > > > > > Map<TopicPartition, OffsetAndEpoch> positions(); > > > > > > void seek(Map<TopicPartition, OffsetAndEpoch> positions); > > > > > > > > > > > > However, I'm actually leaning toward leaving the seek() and > > > position() > > > > > APIs > > > > > > unchanged. Instead, we can add a new API to search for offset by > > > > > timestamp > > > > > > or by offset/leader epoch. Let's say we call it `findOffsets`. If > > the > > > > > user > > > > > > hits a log truncation error, they can use this API to find the > > > closest > > > > > > offset and then do a seek(). At the same time, we deprecate the > > > > > > `offsetsForTimes` APIs. We now have two use cases which require > > > finding > > > > > > offsets, so I think we should make this API general and leave the > > > door > > > > > open > > > > > > for future extensions. > > > > > > > > > > > > By the way, I'm unclear about the desire to move part of this > > > > > functionality > > > > > > to AdminClient. Guozhang suggested this previously, but I think > it > > > only > > > > > > makes sense for cross-cutting capabilities such as topic > creation. > > If > > > > we > > > > > > have an API which is primarily useful by consumers, then I think > > > that's > > > > > > where it should be exposed. The AdminClient also has its own API > > > > > integrity > > > > > > and should not become a dumping ground for advanced use cases. > I'll > > > > > update > > > > > > the KIP with the `findOffsets` API suggested above and we can > see > > if > > > > it > > > > > > does a good enough job of keeping the API simple for common > cases. > > > > > > > > > > > > Thanks, > > > > > > Jason > > > > > > > > > > > > > > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hey Jason, > > > > > > > > > > > > > > Regarding seek(...), it seems that we want an API for user to > > > > > initialize > > > > > > > consumer with (offset, leaderEpoch) and that API should allow > > > > throwing > > > > > > > PartitionTruncationException. Suppose we agree on this, then > > > > > > > seekToNearest() is not sufficient because it will always > swallow > > > > > > > PartitionTruncationException. Here we have two options. The > first > > > > > option > > > > > > is > > > > > > > to add API offsetsForLeaderEpochs() to translate (leaderEpoch, > > > > offset) > > > > > to > > > > > > > offset. The second option is to have add seek(offset, > > leaderEpoch). > > > > It > > > > > > > seems that second option may be more simpler because it makes > it > > > > clear > > > > > > that > > > > > > > (offset, leaderEpoch) will be used to identify consumer's > > position > > > > in a > > > > > > > partition. And user only needs to handle > > > PartitionTruncationException > > > > > > from > > > > > > > the poll(). In comparison the first option seems a bit harder > to > > > use > > > > > > > because user have to also handle the > PartitionTruncationException > > > if > > > > > > > offsetsForLeaderEpochs() returns different offset from > > > user-provided > > > > > > > offset. What do you think? > > > > > > > > > > > > > > If we decide to add API seek(offset, leaderEpoch), then we can > > > decide > > > > > > > whether and how to add API to translate (offset, leaderEpoch) > to > > > > > offset. > > > > > > It > > > > > > > seems that this API will be needed by advanced user to don't > want > > > > auto > > > > > > > offset reset (so that it can be notified) but still wants to > > reset > > > > > offset > > > > > > > to closest. For those users if probably makes sense to only > have > > > the > > > > > API > > > > > > in > > > > > > > AdminClient. offsetsForTimes() seems like a common API that > will > > be > > > > > > needed > > > > > > > by user's of consumer in general, so it may be more reasonable > to > > > > stay > > > > > in > > > > > > > the consumer API. I don't have a strong opinion on whether > > > > > > > offsetsForTimes() should be replaced by API in AdminClient. > > > > > > > > > > > > > > Though (offset, leaderEpoch) is needed to uniquely identify a > > > message > > > > > in > > > > > > > general, it is only needed for advanced users who has turned on > > > > unclean > > > > > > > leader election, need to use seek(..), and don't want auto > offset > > > > > reset. > > > > > > > Most other users probably just want to enable auto offset reset > > and > > > > > store > > > > > > > offset in Kafka. Thus we might want to keep the existing > > > offset-only > > > > > APIs > > > > > > > (e.g. seek() and position()) for most users while adding new > APIs > > > for > > > > > > > advanced users. And yes, it seems that we need new name for > > > > position(). > > > > > > > > > > > > > > Though I think we need new APIs to carry the new information > > (e.g. > > > > > > > leaderEpoch), I am not very sure how that should look like. One > > > > > possible > > > > > > > option is those APIs in KIP-232. Another option is something > like > > > > this: > > > > > > > > > > > > > > ````` > > > > > > > class OffsetEpochs { > > > > > > > long offset; > > > > > > > int leaderEpoch; > > > > > > > int partitionEpoch; // This may be needed later as > discussed > > in > > > > > > KIP-232 > > > > > > > ... // Hopefully these are all we need to identify message in > > > > Kafka. > > > > > > But > > > > > > > if we need more then we can add new fields in this class. > > > > > > > } > > > > > > > > > > > > > > OffsetEpochs offsetEpochs(TopicPartition); > > > > > > > > > > > > > > void seek(TopicPartition, OffsetEpochs); > > > > > > > `````` > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson < > > > > ja...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Dong, > > > > > > > > > > > > > > > > Thanks for the feedback. The first three points are easy: > > > > > > > > > > > > > > > > 1. Yes, we should be consistent. > > > > > > > > 2. Yes, I will add this. > > > > > > > > 3. Yes, I think we should document the changes to the > committed > > > > > offset > > > > > > > > schema. I meant to do this, but it slipped my mind. > > > > > > > > > > > > > > > > The latter questions are tougher. One option I was > considering > > is > > > > to > > > > > > have > > > > > > > > only `offsetsForLeaderEpochs` exposed from the consumer and > to > > > drop > > > > > the > > > > > > > new > > > > > > > > seek() API. That seems more consistent with the current use > of > > > > > > > > `offsetsForTimes` (we don't have a separate `seekToTimestamp` > > > API). > > > > > An > > > > > > > > alternative might be to take a page from the AdminClient API > > and > > > > add > > > > > a > > > > > > > new > > > > > > > > method to generalize offset lookup. For example, we could > have > > > > > > > > `lookupOffsets(LookupOptions)`. We could then deprecate > > > > > > > `offsetsForTimes` > > > > > > > > and this would open the door for future extensions without > > > needing > > > > > new > > > > > > > > APIs. > > > > > > > > > > > > > > > > The case of position() is a little more annoying. It would > have > > > > been > > > > > > > better > > > > > > > > had we let this return an object so that it is easier to > > extend. > > > > This > > > > > > is > > > > > > > > the only reason I didn't add the API to the KIP. Maybe we > > should > > > > bite > > > > > > the > > > > > > > > bullet and fix this now? Unfortunately we'll have to come up > > > with a > > > > > new > > > > > > > > name. Maybe `currentPosition`? > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > -Jason > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin < > > lindon...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Regarding points 4) and 5) above, motivation for the > > > alternative > > > > > APIs > > > > > > > is > > > > > > > > > that, if we decide that leaderEpoch is equally important as > > > > offset > > > > > in > > > > > > > > > identifying a message, then it may be reasonable to always > > > > specify > > > > > it > > > > > > > > > wherever offset is currently required in the consumer API > to > > > > > > identify a > > > > > > > > > message, e.g. position(), seek(). For example, since we > allow > > > > user > > > > > to > > > > > > > > > retrieve offset using position() instead of asking user to > > keep > > > > > track > > > > > > > of > > > > > > > > > the offset of the latest ConsumerRecord, may be it will be > > more > > > > > > > > consistent > > > > > > > > > for user to also retrieve leaderEpoch using position()? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin < > > > lindon...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Jason, > > > > > > > > > > > > > > > > > > > > Thanks for the update. It looks pretty good. Just some > > minor > > > > > > comments > > > > > > > > > > below: > > > > > > > > > > > > > > > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new > > > > exception > > > > > > > > > TruncatedPartitionException. > > > > > > > > > > Can we make the name more consistent, e.g. > > > > > LogTruncationException? > > > > > > > > > > > > > > > > > > > > 2) Do we need to add UnknownLeaderEpochException as part > of > > > API > > > > > > > change? > > > > > > > > > > > > > > > > > > > > 3) Not sure if the offset topic schema is also public > API. > > If > > > > so, > > > > > > > maybe > > > > > > > > > we > > > > > > > > > > should also include the schema change in the API? > > > > > > > > > > > > > > > > > > > > 4) For users who store offset externally, currently they > > get > > > > > offset > > > > > > > > using > > > > > > > > > > position(..), store the offset externally, and use > seek(..) > > > to > > > > > > > > initialize > > > > > > > > > > the consumer next time. After this KIP they will need to > > > store > > > > > and > > > > > > > use > > > > > > > > > the > > > > > > > > > > leaderEpoch together with the offset. Should we also > update > > > the > > > > > API > > > > > > > so > > > > > > > > > that > > > > > > > > > > user can also get leaderEpoch from position(...)? Not > sure > > if > > > > it > > > > > is > > > > > > > OK > > > > > > > > to > > > > > > > > > > ask user to track the latest leaderEpoch of > ConsumerRecord > > by > > > > > > > > themselves. > > > > > > > > > > > > > > > > > > > > 5) Also for users who store offset externally, they need > to > > > > call > > > > > > > > seek(..) > > > > > > > > > > with leaderEpoch to initialize consumer. With current KIP > > > users > > > > > > need > > > > > > > to > > > > > > > > > > call seekToNearest(), whose name suggests that the final > > > > position > > > > > > may > > > > > > > > be > > > > > > > > > > different from what was requested. However, if users may > > want > > > > to > > > > > > > avoid > > > > > > > > > auto > > > > > > > > > > offset reset and be notified explicitly when there is log > > > > > > truncation, > > > > > > > > > then seekToNearest() > > > > > > > > > > probably does not help here. Would it make sense to > replace > > > > > > > > > seekToNearest() > > > > > > > > > > with seek(offset, leaderEpoch) + AminClient. > > > > > > > > offsetsForLeaderEpochs(...)? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson < > > > > > > ja...@confluent.io > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Hey Guozhang, > > > > > > > > > >> > > > > > > > > > >> That's fair. In fact, perhaps we do not need this API at > > > all. > > > > We > > > > > > > > already > > > > > > > > > >> have the new seek() in this KIP which can do the lookup > > > based > > > > on > > > > > > > epoch > > > > > > > > > for > > > > > > > > > >> this use case. I guess we should probably call it > > > > > seekToNearest() > > > > > > > > though > > > > > > > > > >> to > > > > > > > > > >> make it clear that the final position may be different > > from > > > > what > > > > > > was > > > > > > > > > >> requested. > > > > > > > > > >> > > > > > > > > > >> Thanks, > > > > > > > > > >> Jason > > > > > > > > > >> > > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang < > > > > > > wangg...@gmail.com> > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > >> > Hi Jason, > > > > > > > > > >> > > > > > > > > > > >> > I think it is less worthwhile to add > > > > > > > KafkaConsumer#offsetsForLeader > > > > > > > > > >> Epochs, > > > > > > > > > >> > since probably only very advanced users are aware of > the > > > > > > > > leaderEpoch, > > > > > > > > > >> and > > > > > > > > > >> > hence ever care to use it anyways. It is more like an > > > admin > > > > > > client > > > > > > > > > >> > operation than a consumer client operation: if the > > > > motivation > > > > > is > > > > > > > to > > > > > > > > > >> > facility customized reset policy, maybe adding it as > > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs > > > > > > > > > >> > is better as it is not an aggressive assumption that > for > > > > such > > > > > > > > advanced > > > > > > > > > >> > users they are willing to use some admin client to get > > > > further > > > > > > > > > >> information? > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Guozhang > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson < > > > > > > > > ja...@confluent.io> > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Thanks for the feedback. I've updated the KIP. > > > > Specifically > > > > > I > > > > > > > > > removed > > > > > > > > > >> the > > > > > > > > > >> > > "closest" reset option and the proposal to reset by > > > > > timestamp > > > > > > > when > > > > > > > > > the > > > > > > > > > >> > > precise truncation point cannot be determined. > > Instead, > > > I > > > > > > > proposed > > > > > > > > > >> that > > > > > > > > > >> > we > > > > > > > > > >> > > always reset using the nearest epoch when a reset > > policy > > > > is > > > > > > > > defined > > > > > > > > > >> > (either > > > > > > > > > >> > > "earliest" or "latest"). Does that sound reasonable? > > > > > > > > > >> > > > > > > > > > > > >> > > One thing I am still debating is whether it would be > > > > better > > > > > to > > > > > > > > have > > > > > > > > > a > > > > > > > > > >> > > separate API to find the closest offset using the > > leader > > > > > > epoch. > > > > > > > In > > > > > > > > > the > > > > > > > > > >> > > current KIP, I suggested to piggyback this > information > > > on > > > > an > > > > > > > > > >> exception, > > > > > > > > > >> > but > > > > > > > > > >> > > I'm beginning to think it would be better not to > hide > > > the > > > > > > > lookup. > > > > > > > > It > > > > > > > > > >> is > > > > > > > > > >> > > awkward to implement since it means delaying the > > > exception > > > > > and > > > > > > > the > > > > > > > > > API > > > > > > > > > >> > may > > > > > > > > > >> > > actually be useful when customizing reset logic if > no > > > auto > > > > > > reset > > > > > > > > > >> policy > > > > > > > > > >> > is > > > > > > > > > >> > > defined. I was thinking we can add an API like the > > > > > following: > > > > > > > > > >> > > > > > > > > > > > >> > > Map<TopicPartition, OffsetAndEpoch> > > > > > > > > > >> > > offsetsForLeaderEpochs(Map<TopicPartition, Integer> > > > > > > > > epochsToSearch) > > > > > > > > > >> > > > > > > > > > > > >> > > Thoughts? > > > > > > > > > >> > > > > > > > > > > > >> > > -Jason > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson < > > > > > > > > > ja...@confluent.io > > > > > > > > > >> > > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > > > > > > > > > >> > > > @Dong > > > > > > > > > >> > > > > > > > > > > > > >> > > > Those are fair points. Both approaches require > some > > > > > > fuzziness > > > > > > > to > > > > > > > > > >> reset > > > > > > > > > >> > > the > > > > > > > > > >> > > > offset in these pathological scenarios and we > cannot > > > > > > guarantee > > > > > > > > > >> > > > at-least-once delivery either way unless we have > the > > > > full > > > > > > > > history > > > > > > > > > of > > > > > > > > > >> > > leader > > > > > > > > > >> > > > epochs that were consumed. The KIP-101 logic may > > > > actually > > > > > be > > > > > > > > more > > > > > > > > > >> > > accurate > > > > > > > > > >> > > > than using timestamps because it does not depend > on > > > the > > > > > > > messages > > > > > > > > > >> which > > > > > > > > > >> > > are > > > > > > > > > >> > > > written after the unclean leader election. The > case > > > > we're > > > > > > > > talking > > > > > > > > > >> about > > > > > > > > > >> > > > should be extremely rare in practice anyway. I > also > > > > agree > > > > > > that > > > > > > > > we > > > > > > > > > >> may > > > > > > > > > >> > not > > > > > > > > > >> > > > want to add new machinery if it only helps the old > > > > message > > > > > > > > format. > > > > > > > > > >> Ok, > > > > > > > > > >> > > > let's go ahead and drop the timestamp. > > > > > > > > > >> > > > > > > > > > > > > >> > > > @Guozhang > > > > > > > > > >> > > > > > > > > > > > > >> > > > * My current understanding is that, with unclean > > > leader > > > > > > > election > > > > > > > > > >> turned > > > > > > > > > >> > > on, > > > > > > > > > >> > > >> exactly-once is out of the window since we cannot > > > > > guarantee > > > > > > > > that > > > > > > > > > >> all > > > > > > > > > >> > > >> committed message markers will not be lost. And > > hence > > > > > there > > > > > > > is > > > > > > > > no > > > > > > > > > >> need > > > > > > > > > >> > > to > > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or > > OOR > > > > > error > > > > > > > > codes > > > > > > > > > >> with > > > > > > > > > >> > > >> read.committed turned on. Is that right? > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Yes, that's right. EoS and unclean leader election > > > don't > > > > > mix > > > > > > > > well. > > > > > > > > > >> It > > > > > > > > > >> > may > > > > > > > > > >> > > > be worth considering separately whether we should > > try > > > to > > > > > > > > reconcile > > > > > > > > > >> the > > > > > > > > > >> > > > transaction log following an unclean leader > > election. > > > At > > > > > > least > > > > > > > > we > > > > > > > > > >> may > > > > > > > > > >> > be > > > > > > > > > >> > > > able to prevent dangling transactions from > blocking > > > > > > consumers. > > > > > > > > > This > > > > > > > > > >> KIP > > > > > > > > > >> > > > does not address this problem. > > > > > > > > > >> > > > > > > > > > > > > >> > > > * MINOR: "if the epoch is greater than the minimum > > > > > expected > > > > > > > > epoch, > > > > > > > > > >> that > > > > > > > > > >> > > the > > > > > > > > > >> > > >> new epoch does not begin at an earlier offset > than > > > the > > > > > > fetch > > > > > > > > > >> offset. > > > > > > > > > >> > In > > > > > > > > > >> > > >> the latter case, the leader can respond with a > new > > > > > > > > LOG_TRUNCATION > > > > > > > > > >> > error > > > > > > > > > >> > > >> code" should it be "does not begin at a later > > offset > > > > than > > > > > > the > > > > > > > > > fetch > > > > > > > > > >> > > >> offset"? > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > I think the comment is correct, though the > phrasing > > > may > > > > be > > > > > > > > > >> confusing. > > > > > > > > > >> > We > > > > > > > > > >> > > > know truncation has occurred if there exists a > > larger > > > > > epoch > > > > > > > > with a > > > > > > > > > >> > > starting > > > > > > > > > >> > > > offset that is lower than the fetch offset. Let me > > try > > > > to > > > > > > > > rephrase > > > > > > > > > >> > this. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > >> > > > Jason > > > > > > > > > >> > > > > > > > > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang < > > > > > > > > > wangg...@gmail.com> > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > > > > > > > > > > >> > > >> Jason, thanks for the KIP. A few comments: > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> * I think Dong's question about whether to use > > > > > > > timestamp-based > > > > > > > > > >> > approach > > > > > > > > > >> > > >> v.s. start-offset-of-first-larger-epoch is valid; > > > more > > > > > > > > > >> specifically, > > > > > > > > > >> > > with > > > > > > > > > >> > > >> timestamp-based approach we may still be reseting > > to > > > an > > > > > > > offset > > > > > > > > > >> falling > > > > > > > > > >> > > >> into > > > > > > > > > >> > > >> the truncated interval, and hence we may still > miss > > > > some > > > > > > > data, > > > > > > > > > i.e. > > > > > > > > > >> > not > > > > > > > > > >> > > >> guaranteeing at-least-once still. With the > > > > > > > > > >> > > >> start-offset-of-first-larger-epoch, I'm not sure > > if > > > it > > > > > > will > > > > > > > > > >> guarantee > > > > > > > > > >> > > no > > > > > > > > > >> > > >> valid data is missed when we have consecutive log > > > > > > truncations > > > > > > > > > >> (maybe > > > > > > > > > >> > we > > > > > > > > > >> > > >> need to look back into details of KIP-101 to > figure > > > it > > > > > > out). > > > > > > > If > > > > > > > > > the > > > > > > > > > >> > > latter > > > > > > > > > >> > > >> can indeed guarantee at least once, we could > > consider > > > > > using > > > > > > > > that > > > > > > > > > >> > > approach. > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> * My current understanding is that, with unclean > > > leader > > > > > > > > election > > > > > > > > > >> > turned > > > > > > > > > >> > > >> on, > > > > > > > > > >> > > >> exactly-once is out of the window since we cannot > > > > > guarantee > > > > > > > > that > > > > > > > > > >> all > > > > > > > > > >> > > >> committed message markers will not be lost. And > > hence > > > > > there > > > > > > > is > > > > > > > > no > > > > > > > > > >> need > > > > > > > > > >> > > to > > > > > > > > > >> > > >> have special handling logic for LOG_TRUNCATED or > > OOR > > > > > error > > > > > > > > codes > > > > > > > > > >> with > > > > > > > > > >> > > >> read.committed turned on. Is that right? > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> * MINOR: "if the epoch is greater than the > minimum > > > > > expected > > > > > > > > > epoch, > > > > > > > > > >> > that > > > > > > > > > >> > > >> the > > > > > > > > > >> > > >> new epoch does not begin at an earlier offset > than > > > the > > > > > > fetch > > > > > > > > > >> offset. > > > > > > > > > >> > In > > > > > > > > > >> > > >> the latter case, the leader can respond with a > new > > > > > > > > LOG_TRUNCATION > > > > > > > > > >> > error > > > > > > > > > >> > > >> code" should it be "does not begin at a later > > offset > > > > than > > > > > > the > > > > > > > > > fetch > > > > > > > > > >> > > >> offset"? > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> Guozhang > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin < > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > Hey Jason, > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Thanks for the explanation. > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Please correct me if this is wrong. The > "unknown > > > > > > truncation > > > > > > > > > >> offset" > > > > > > > > > >> > > >> > scenario happens when consumer does not have > the > > > full > > > > > > > > > >> leaderEpoch -> > > > > > > > > > >> > > >> offset > > > > > > > > > >> > > >> > mapping. In this case we can still use the > > > > > KIP-101-based > > > > > > > > > >> approach to > > > > > > > > > >> > > >> > truncate offset to "start offset of the first > > > Leader > > > > > > Epoch > > > > > > > > > larger > > > > > > > > > >> > than > > > > > > > > > >> > > >> last > > > > > > > > > >> > > >> > epoch of the consumer" but it may be > inaccurate. > > So > > > > the > > > > > > KIP > > > > > > > > > >> chooses > > > > > > > > > >> > to > > > > > > > > > >> > > >> use > > > > > > > > > >> > > >> > the timestamp-based approach which is also > > > > best-effort. > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > If this understanding is correct, for "closest" > > > > offset > > > > > > > reset > > > > > > > > > >> policy > > > > > > > > > >> > > and > > > > > > > > > >> > > >> > "unknown truncation offset" scenario, I am > > > wondering > > > > > > > whether > > > > > > > > it > > > > > > > > > >> > maybe > > > > > > > > > >> > > >> > better to replace timestamp-based approach with > > > > KIP-101 > > > > > > > based > > > > > > > > > >> > > approach. > > > > > > > > > >> > > >> In > > > > > > > > > >> > > >> > comparison to timestamp-based approach, the > > > > > KIP-101-based > > > > > > > > > >> approach > > > > > > > > > >> > > >> seems to > > > > > > > > > >> > > >> > simplify the API a bit since user does not need > > to > > > > > > > understand > > > > > > > > > >> > > timestamp. > > > > > > > > > >> > > >> > Similar to the timestamp-based approach, both > > > > > approaches > > > > > > > are > > > > > > > > > >> > > best-effort > > > > > > > > > >> > > >> > and do not guarantee that consumer can consume > > all > > > > > > > messages. > > > > > > > > It > > > > > > > > > >> is > > > > > > > > > >> > not > > > > > > > > > >> > > >> like > > > > > > > > > >> > > >> > KIP-279 which guarantees that follower broker > can > > > > > consume > > > > > > > all > > > > > > > > > >> > messages > > > > > > > > > >> > > >> from > > > > > > > > > >> > > >> > the leader. > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Then it seems that the remaining difference is > > > mostly > > > > > > about > > > > > > > > > >> > accuracy, > > > > > > > > > >> > > >> i.e. > > > > > > > > > >> > > >> > how much message will be duplicated or missed > in > > > the > > > > > > > "unknown > > > > > > > > > >> > > truncation > > > > > > > > > >> > > >> > offset" scenario. Not sure either one is > clearly > > > > better > > > > > > > than > > > > > > > > > the > > > > > > > > > >> > > other. > > > > > > > > > >> > > >> > Note that there are two scenarios mentioned in > > > > KIP-279 > > > > > > > which > > > > > > > > > are > > > > > > > > > >> not > > > > > > > > > >> > > >> > addressed by KIP-101. Both scenarios require > > quick > > > > > > > leadership > > > > > > > > > >> change > > > > > > > > > >> > > >> > between brokers, which seems to suggest that > the > > > > offset > > > > > > > based > > > > > > > > > >> > obtained > > > > > > > > > >> > > >> > by "start > > > > > > > > > >> > > >> > offset of the first Leader Epoch larger than > last > > > > epoch > > > > > > of > > > > > > > > the > > > > > > > > > >> > > consumer" > > > > > > > > > >> > > >> > under these two scenarios may be very close to > > the > > > > > offset > > > > > > > > > >> obtained > > > > > > > > > >> > by > > > > > > > > > >> > > >> the > > > > > > > > > >> > > >> > message timestamp. Does this sound reasonable? > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Good point that users on v1 format can get > > benefit > > > > with > > > > > > > > > timestamp > > > > > > > > > >> > > based > > > > > > > > > >> > > >> > approach. On the other hand it seems like a > short > > > > term > > > > > > > > benefit > > > > > > > > > >> for > > > > > > > > > >> > > users > > > > > > > > > >> > > >> > who have not migrated. I am just not sure > whether > > > it > > > > is > > > > > > > more > > > > > > > > > >> > important > > > > > > > > > >> > > >> than > > > > > > > > > >> > > >> > designing a better API. > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Also, for both "latest" and "earliest" reset > > > policy, > > > > do > > > > > > you > > > > > > > > > >> think it > > > > > > > > > >> > > >> would > > > > > > > > > >> > > >> > make sense to also use the KIP-101 based > approach > > > to > > > > > > > truncate > > > > > > > > > >> offset > > > > > > > > > >> > > for > > > > > > > > > >> > > >> > the "unknown truncation offset" scenario? > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > Thanks, > > > > > > > > > >> > > >> > Dong > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> -- > > > > > > > > > >> > > >> -- Guozhang > > > > > > > > > >> > > >> > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > -- > > > > > > > > > >> > -- Guozhang > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >