Hi Dong,
What I called “not covering all use cases” is what you call best-effort (not guaranteeing some corner cases). I think we are on the same page here. I wanted to be clear in the API whether the consumer seeks to a position (offset) or to a record (offset, leader epoch). The only use-case of seeking to a record is seeking to a committed offset for a user who stores committed offsets externally. (Unless users find some other reason to seek to a record.) I thought it was possible to provide this functionality with findOffset(offset, leader epoch) followed by a seek(offset). However, you are right that this will not handle the race condition where non-divergent offset found by findOffset() could change again before the consumer does the first fetch. Regarding position() — if we add position that returns (offset, leader epoch), this is specifically a position after a record that was actually consumed or position of a committed record. In which case, I still think it’s cleaner to get a record position of consumed message from a new helper method in ConsumerRecords() or from committed offsets. I think all the use-cases could be then covered with: (Approach 1) seekToRecord(offset, leaderEpoch) — this will just initialize/set the consumer state; findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch} If we agree that the race condition is also a corner case, then I think we can cover use-cases with: (Approach 2) findOffsets(offset, leaderEpoch) returns offset — we still want leader epoch as a parameter for the users who store their committed offsets externally. I am actually now leaning more to approach 1, since it is more explicit, and maybe there are more use cases for it. Thanks, Anna On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <lindon...@gmail.com> wrote: > Hey Anna, > > Thanks for the comment. To answer your question, it seems that we can cover > all case in this KIP. As stated in "Consumer Handling" section, KIP-101 > based approach will be used to derive the truncation offset from the > 2-tuple (offset, leaderEpoch). This approach is best effort and it is > inaccurate only in very rare scenarios (as described in KIP-279). > > By using seek(offset, leaderEpoch), consumer will still be able to follow > this best-effort approach to detect log truncation and determine the > truncation offset. On the other hand, if we use seek(offset), consumer will > not detect log truncation in some cases which weakens the guarantee of this > KIP. Does this make sense? > > Thanks, > Dong > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <a...@confluent.io> wrote: > > > Sorry, I hit "send" before finishing. Continuing... > > > > > > 2) Hiding most of the consumer handling log truncation logic with minimal > > exposure in KafkaConsumer API. I was proposing this path. > > > > > > Before answering your specific questions… I want to answer to your > comment > > “In general, maybe we should discuss the final solution that covers all > > cases?”. With current KIP, we don’t cover all cases of consumer detecting > > log truncation because the KIP proposes a leader epoch cache in consumer > > that does not persist across restarts. Plus, we only store last committed > > offset (either internally or users can store externally). This has a > > limitation that the consumer will not always be able to find point of > > truncation just because we have a limited history (just one data point). > > > > > > So, maybe we should first agree on whether we accept that storing last > > committed offset/leader epoch has a limitation that the consumer will not > > be able to detect log truncation in all cases? > > > > > > Thanks, > > > > Anna > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner <a...@confluent.io> wrote: > > > > > Hi Dong, > > > > > > Thanks for the follow up! I finally have much more clear understanding > of > > > where you are coming from. > > > > > > You are right. The success of findOffsets()/finding a point of > > > non-divergence depends on whether we have enough entries in the > > consumer's > > > leader epoch cache. However, I think this is a fundamental limitation > of > > > having a leader epoch cache that does not persist across consumer > > restarts. > > > > > > If we consider the general case where consumer may or may not have this > > > cache, then I see two paths: > > > 1) Letting the user to track the leader epoch history externally, and > > have > > > more exposure to leader epoch and finding point of non-divergence in > > > KafkaConsumer API. I understand this is the case you were talking > about. > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > >> Hey Anna, > > >> > > >> Thanks much for your detailed explanation and example! It does help me > > >> understand the difference between our understanding. > > >> > > >> So it seems that the solution based on findOffsets() currently focuses > > >> mainly on the scenario that consumer has cached leaderEpoch -> offset > > >> mapping whereas I was thinking about the general case where consumer > may > > >> or > > >> may not have this cache. I guess that is why we have different > > >> understanding here. I have some comments below. > > >> > > >> > > >> 3) The proposed solution using findOffsets(offset, leaderEpoch) > followed > > >> by > > >> seek(offset) works if consumer has the cached leaderEpoch -> offset > > >> mapping. But if we assume consumer has this cache, do we need to have > > >> leaderEpoch in the findOffsets(...)? Intuitively, the > > findOffsets(offset) > > >> can also derive the leaderEpoch using offset just like the proposed > > >> solution does with seek(offset). > > >> > > >> > > >> 4) If consumer does not have cached leaderEpoch -> offset mapping, > which > > >> is > > >> the case if consumer is restarted on a new machine, then it is not > clear > > >> what leaderEpoch would be included in the FetchRequest if consumer > does > > >> seek(offset). This is the case that motivates the first question of > the > > >> previous email. In general, maybe we should discuss the final solution > > >> that > > >> covers all cases? > > >> > > >> > > >> 5) The second question in my previous email is related to the > following > > >> paragraph: > > >> > > >> "... In some cases, offsets returned from position() could be actual > > >> consumed messages by this consumer identified by {offset, leader > epoch}. > > >> In > > >> other cases, position() returns offset that was not actually consumed. > > >> Suppose, the user calls position() for the last offset...". > > >> > > >> I guess my point is that, if user calls position() for the last offset > > and > > >> uses that offset in seek(...), then user can probably just call > > >> Consumer#seekToEnd() without calling position() and seek(...). > Similarly > > >> user can call Consumer#seekToBeginning() to the seek to the earliest > > >> position without calling position() and seek(...). Thus position() > only > > >> needs to return the actual consumed messages identified by {offset, > > leader > > >> epoch}. Does this make sense? > > >> > > >> > > >> Thanks, > > >> Dong > > >> > > >> > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner <a...@confluent.io> > wrote: > > >> > > >> > Hi Dong, > > >> > > > >> > > > >> > Thanks for considering my suggestions. > > >> > > > >> > > > >> > Based on your comments, I realized that my suggestion was not > complete > > >> with > > >> > regard to KafkaConsumer API vs. consumer-broker protocol. While I > > >> propose > > >> > to keep KafkaConsumer#seek() unchanged and take offset only, the > > >> underlying > > >> > consumer will send the next FetchRequest() to broker with offset and > > >> > leaderEpoch if it is known (based on leader epoch cache in > consumer) — > > >> note > > >> > that this is different from the current KIP, which suggests to > always > > >> send > > >> > unknown leader epoch after seek(). This way, if the consumer and a > > >> broker > > >> > agreed on the point of non-divergence, which is some {offset, > > >> leaderEpoch} > > >> > pair, the new leader which causes another truncation (even further > > back) > > >> > will be able to detect new divergence and restart the process of > > finding > > >> > the new point of non-divergence. So, to answer your question, If the > > >> > truncation happens just after the user calls > > >> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by > > seek(offset), > > >> > the user will not seek to the wrong position without knowing that > > >> > truncation has happened, because the consumer will get another > > >> truncation > > >> > error, and seek again. > > >> > > > >> > > > >> > I am afraid, I did not understand your second question. Let me > > >> summarize my > > >> > suggestions again, and then give an example to hopefully make my > > >> > suggestions more clear. Also, the last part of my example shows how > > the > > >> > use-case in your first question will work. If it does not answer > your > > >> > second question, would you mind clarifying? I am also focusing on > the > > >> case > > >> > of a consumer having enough entries in the cache. The case of > > restarting > > >> > from committed offset either stored externally or internally will > > >> probably > > >> > need to be discussed more. > > >> > > > >> > > > >> > Let me summarize my suggestion again: > > >> > > > >> > 1) KafkaConsumer#seek() and KafkaConsumer#position() remains > unchanged > > >> > > > >> > 2) New KafkaConsumer#findOffsets() takes {offset, leaderEpoch} pair > > per > > >> > topic partition and returns offset per topic partition. > > >> > > > >> > 3) FetchRequest() to broker after KafkaConsumer#seek() will contain > > the > > >> > offset set by seek and leaderEpoch that corresponds to the offset > > based > > >> on > > >> > leader epoch cache in the consumer. > > >> > > > >> > > > >> > The rest of this e-mail is a long and contrived example with several > > log > > >> > truncations and unclean leader elections to illustrate the API and > > your > > >> > first use-case. Suppose we have three brokers. Initially, Broker A, > B, > > >> and > > >> > C has one message at offset 0 with leader epoch 0. Then, Broker A > goes > > >> down > > >> > for some time. Broker B becomes a leader with epoch 1, and writes > > >> messages > > >> > to offsets 1 and 2. Broker C fetches offset 1, but before fetching > > >> offset > > >> > 2, becomes a leader with leader epoch 2 and writes a message at > offset > > >> 2. > > >> > Here is the state of brokers at this point: > > >> > > > >> > > Broker A: > > >> > > offset 0, epoch 0 <— leader > > >> > > goes down… > > >> > > > >> > > > >> > > Broker B: > > >> > > offset 0, epoch 0 > > >> > > offset 1, epoch 1 <- leader > > >> > > offset 2, epoch 1 > > >> > > > >> > > > >> > > > >> > Broker C: > > >> > > offset 0, epoch 0 > > >> > > offset 1, epoch 1 > > >> > > offset 2, epoch 2 <— leader > > >> > > > >> > > > >> > Before Broker C becomes a leader with leader epoch 2, the consumer > > >> consumed > > >> > the following messages from broker A and broker B: > > >> > > > >> > {offset=0, leaderEpoch=0}, {offset=1, leaderEpoch=1}, {offset=2, > > >> > leaderEpoch=1}. > > >> > > > >> > Consumer’s leader epoch cache at this point contains the following > > >> entries: > > >> > > > >> > (leaderEpoch=0, startOffset=0) > > >> > > > >> > (leaderEpoch=1, startOffset=1) > > >> > > > >> > endOffset = 3 > > >> > > > >> > > > >> > Then, broker B becomes the follower of broker C, truncates and > starts > > >> > fetching from offset 2. > > >> > > > >> > Consumer sends fetchRequest(offset=3, leaderEpoch=1) and gets > > >> > LOG_TRUNCATION > > >> > error from broker C. > > >> > > > >> > In response, the client calls KafkaConsumer#findOffsets(offset=3, > > >> > leaderEpoch=1). The underlying consumer sends > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker C responds with > > >> > {leaderEpoch=1, endOffset=2}. So, > KafkaConsumer#findOffsets(offset=3, > > >> > leaderEpoch=1) returns offset=2. > > >> > > > >> > In response, consumer calls KafkaConsumer@seek(offset=2) followed > by > > >> > poll(), which results in FetchRequest(offset=2, leaderEpoch=1) to > > >> broker C. > > >> > > > >> > > > >> > I will continue with this example with the goal to answer your first > > >> > question about truncation just after findOffsets() followed by > seek(): > > >> > > > >> > Suppose, brokers B and C go down, and broker A comes up and becomes > a > > >> > leader with leader epoch 3, and writes a message to offset 1. > Suppose, > > >> this > > >> > happens before the consumer gets response from broker C to the > > previous > > >> > fetch request: FetchRequest(offset=2, leaderEpoch=1). > > >> > > > >> > Consumer re-sends FetchRequest(offset=2, leaderEpoch=1) to broker A, > > >> which > > >> > returns LOG_TRUNCATION error, because broker A has leader epoch 3 > > > >> leader > > >> > epoch in FetchRequest with starting offset = 1 < offset 2 in > > >> > FetchRequest(). > > >> > > > >> > In response, the user calls KafkaConsumer#findOffsets(offset=2, > > >> > leaderEpoch=1). The underlying consumer sends > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), broker A responds with > > >> > {leaderEpoch=0, endOffset=1}; the underlying consumer finds > > leaderEpoch > > >> = 0 > > >> > in its cache with end offset == 1, which results in > > >> > KafkaConsumer#findOffsets(offset=2, leaderEpoch=1) returning offset > > = 1. > > >> > > > >> > In response, the user calls KafkaConsumer@seek(offset=1) followed > by > > >> > poll(), which results in FetchRequest(offset=1, leaderEpoch=0) to > > >> broker A, > > >> > which responds with message at offset 1, leader epoch 3. > > >> > > > >> > > > >> > I will think some more about consumers restarting from committed > > >> offsets, > > >> > and send a follow up. > > >> > > > >> > > > >> > Thanks, > > >> > > > >> > Anna > > >> > > > >> > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin <lindon...@gmail.com> > wrote: > > >> > > > >> > > Hey Anna, > > >> > > > > >> > > Thanks much for the thoughtful reply. It makes sense to different > > >> between > > >> > > "seeking to a message" and "seeking to a position". I have to > > >> questions > > >> > > here: > > >> > > > > >> > > - For "seeking to a message" use-case, with the proposed approach > > user > > >> > > needs to call findOffset(offset, leaderEpoch) followed by > > >> seek(offset). > > >> > If > > >> > > message truncation and message append happen immediately after > > >> > > findOffset(offset, > > >> > > leaderEpoch) but before seek(offset), it seems that user will seek > > to > > >> the > > >> > > wrong message without knowing the truncation has happened. Would > > this > > >> be > > >> > a > > >> > > problem? > > >> > > > > >> > > - For "seeking to a position" use-case, it seems that there can be > > two > > >> > > positions, i.e. earliest and latest. So these two cases can be > > >> > > Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). > > >> Then it > > >> > > seems that user will only need to call position() and seek() for > > >> "seeking > > >> > > to a message" use-case? > > >> > > > > >> > > Thanks, > > >> > > Dong > > >> > > > > >> > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner <a...@confluent.io> > > >> wrote: > > >> > > > > >> > > > Hi Jason and Dong, > > >> > > > > > >> > > > > > >> > > > I’ve been thinking about your suggestions and discussion > regarding > > >> > > > position(), seek(), and new proposed API. > > >> > > > > > >> > > > > > >> > > > Here is my thought process why we should keep position() and > > seek() > > >> API > > >> > > > unchanged. > > >> > > > > > >> > > > > > >> > > > I think we should separate {offset, leader epoch} that uniquely > > >> > > identifies > > >> > > > a message from an offset that is a position. In some cases, > > offsets > > >> > > > returned from position() could be actual consumed messages by > this > > >> > > consumer > > >> > > > identified by {offset, leader epoch}. In other cases, position() > > >> > returns > > >> > > > offset that was not actually consumed. Suppose, the user calls > > >> > position() > > >> > > > for the last offset. Suppose we return {offset, leader epoch} of > > the > > >> > > > message currently in the log. Then, the message gets truncated > > >> before > > >> > > > consumer’s first poll(). It does not make sense for poll() to > fail > > >> in > > >> > > this > > >> > > > case, because the log truncation did not actually happen from > the > > >> > > consumer > > >> > > > perspective. On the other hand, as the KIP proposes, it makes > > sense > > >> for > > >> > > the > > >> > > > committed() method to return {offset, leader epoch} because > those > > >> > offsets > > >> > > > represent actual consumed messages. > > >> > > > > > >> > > > > > >> > > > The same argument applies to the seek() method — we are not > > seeking > > >> to > > >> > a > > >> > > > message, we are seeking to a position. > > >> > > > > > >> > > > > > >> > > > I like the proposal to add KafkaConsumer#findOffsets() API. I am > > >> > assuming > > >> > > > something like: > > >> > > > > > >> > > > Map<TopicPartition, Long> findOffsets(Map<TopicPartition, > > >> > OffsetAndEpoch> > > >> > > > offsetsToSearch) > > >> > > > > > >> > > > Similar to seek() and position(), I think findOffsets() should > > >> return > > >> > > > offset without leader epoch, because what we want is the offset > > >> that we > > >> > > > think is closest to the not divergent message from the given > > >> consumed > > >> > > > message. Until the consumer actually fetches the message, we > > should > > >> not > > >> > > let > > >> > > > the consumer store the leader epoch for a message it did not > > >> consume. > > >> > > > > > >> > > > > > >> > > > So, the workflow will be: > > >> > > > > > >> > > > 1) The user gets LogTruncationException with {offset, leader > epoch > > >> of > > >> > the > > >> > > > previous message} (whatever we send with new FetchRecords > > request). > > >> > > > > > >> > > > 2) offset = findOffsets(tp -> {offset, leader epoch}) > > >> > > > > > >> > > > 3) seek(offset) > > >> > > > > > >> > > > > > >> > > > For the use-case where the users store committed offsets > > externally: > > >> > > > > > >> > > > 1) Such users would have to track the leader epoch together with > > an > > >> > > offset. > > >> > > > Otherwise, there is no way to detect later what leader epoch was > > >> > > associated > > >> > > > with the message. I think it’s reasonable to ask that from users > > if > > >> > they > > >> > > > want to detect log truncation. Otherwise, they will get the > > current > > >> > > > behavior. > > >> > > > > > >> > > > > > >> > > > If the users currently get an offset to be stored using > > position(), > > >> I > > >> > see > > >> > > > two possibilities. First, they call save offset returned from > > >> > position() > > >> > > > that they call before poll(). In that case, it would not be > > correct > > >> to > > >> > > > store {offset, leader epoch} if we would have changed position() > > to > > >> > > return > > >> > > > {offset, leader epoch} since actual fetched message could be > > >> different > > >> > > > (from the example I described earlier). So, it would be more > > >> correct to > > >> > > > call position() after poll(). However, the user already gets > > >> > > > ConsumerRecords at this point, from which the user can extract > > >> {offset, > > >> > > > leader epoch} of the last message. > > >> > > > > > >> > > > > > >> > > > So, I like the idea of adding a helper method to > ConsumerRecords, > > as > > >> > > Jason > > >> > > > proposed, something like: > > >> > > > > > >> > > > public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where > > >> OffsetAndEpoch > > >> > > is > > >> > > > a data struct holding {offset, leader epoch}. > > >> > > > > > >> > > > > > >> > > > In this case, we would advise the user to follow the workflow: > > >> poll(), > > >> > > get > > >> > > > {offset, leader epoch} from ConsumerRecords#lastOffsetWith > > >> > LeaderEpoch(), > > >> > > > save offset and leader epoch, process records. > > >> > > > > > >> > > > > > >> > > > 2) When the user needs to seek to the last committed offset, > they > > >> call > > >> > > new > > >> > > > findOffsets(saved offset, leader epoch), and then seek(offset). > > >> > > > > > >> > > > > > >> > > > What do you think? > > >> > > > > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Anna > > >> > > > > > >> > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong Lin <lindon...@gmail.com> > > >> wrote: > > >> > > > > > >> > > > > Hey Jason, > > >> > > > > > > >> > > > > Thanks much for your thoughtful explanation. > > >> > > > > > > >> > > > > Yes the solution using findOffsets(offset, leaderEpoch) also > > >> works. > > >> > The > > >> > > > > advantage of this solution it adds only one API instead of two > > >> APIs. > > >> > > The > > >> > > > > concern is that its usage seems a bit more clumsy for advanced > > >> users. > > >> > > > More > > >> > > > > specifically, advanced users who store offsets externally will > > >> always > > >> > > > need > > >> > > > > to call findOffsets() before calling seek(offset) during > > consumer > > >> > > > > initialization. And those advanced users will need to manually > > >> keep > > >> > > track > > >> > > > > of the leaderEpoch of the last ConsumerRecord. > > >> > > > > > > >> > > > > The other solution may be more user-friendly for advanced > users > > >> is to > > >> > > add > > >> > > > > two APIs, `void seek(offset, leaderEpoch)` and `(offset, > epoch) > > = > > >> > > > > offsetEpochs(topicPartition)`. > > >> > > > > > > >> > > > > I kind of prefer the second solution because it is easier to > use > > >> for > > >> > > > > advanced users. If we need to expose leaderEpoch anyway to > > safely > > >> > > > identify > > >> > > > > a message, it may be conceptually simpler to expose it > directly > > in > > >> > > > > seek(...) rather than requiring one more translation using > > >> > > > > findOffsets(...). But I am also OK with the first solution if > > >> other > > >> > > > > developers also favor that one :) > > >> > > > > > > >> > > > > Thanks, > > >> > > > > Dong > > >> > > > > > > >> > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson < > > >> ja...@confluent.io > > >> > > > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Hi Dong, > > >> > > > > > > > >> > > > > > Thanks, I've been thinking about your suggestions a bit. It > is > > >> > > > > challenging > > >> > > > > > to make this work given the current APIs. One of the > > >> difficulties > > >> > is > > >> > > > that > > >> > > > > > we don't have an API to find the leader epoch for a given > > >> offset at > > >> > > the > > >> > > > > > moment. So if the user does a seek to offset 5, then we'll > > need > > >> a > > >> > new > > >> > > > API > > >> > > > > > to find the corresponding epoch in order to fulfill the new > > >> > > position() > > >> > > > > API. > > >> > > > > > Potentially we could modify ListOffsets to enable finding > the > > >> > leader > > >> > > > > epoch, > > >> > > > > > but I am not sure it is worthwhile. Perhaps it is reasonable > > for > > >> > > > advanced > > >> > > > > > usage to expect that the epoch information, if needed, will > be > > >> > > > extracted > > >> > > > > > from the records directly? It might make sense to expose a > > >> helper > > >> > in > > >> > > > > > `ConsumerRecords` to make this a little easier though. > > >> > > > > > > > >> > > > > > Alternatively, if we think it is important to have this > > >> information > > >> > > > > exposed > > >> > > > > > directly, we could create batch APIs to solve the naming > > >> problem. > > >> > For > > >> > > > > > example: > > >> > > > > > > > >> > > > > > Map<TopicPartition, OffsetAndEpoch> positions(); > > >> > > > > > void seek(Map<TopicPartition, OffsetAndEpoch> positions); > > >> > > > > > > > >> > > > > > However, I'm actually leaning toward leaving the seek() and > > >> > > position() > > >> > > > > APIs > > >> > > > > > unchanged. Instead, we can add a new API to search for > offset > > by > > >> > > > > timestamp > > >> > > > > > or by offset/leader epoch. Let's say we call it > `findOffsets`. > > >> If > > >> > the > > >> > > > > user > > >> > > > > > hits a log truncation error, they can use this API to find > the > > >> > > closest > > >> > > > > > offset and then do a seek(). At the same time, we deprecate > > the > > >> > > > > > `offsetsForTimes` APIs. We now have two use cases which > > require > > >> > > finding > > >> > > > > > offsets, so I think we should make this API general and > leave > > >> the > > >> > > door > > >> > > > > open > > >> > > > > > for future extensions. > > >> > > > > > > > >> > > > > > By the way, I'm unclear about the desire to move part of > this > > >> > > > > functionality > > >> > > > > > to AdminClient. Guozhang suggested this previously, but I > > think > > >> it > > >> > > only > > >> > > > > > makes sense for cross-cutting capabilities such as topic > > >> creation. > > >> > If > > >> > > > we > > >> > > > > > have an API which is primarily useful by consumers, then I > > think > > >> > > that's > > >> > > > > > where it should be exposed. The AdminClient also has its own > > API > > >> > > > > integrity > > >> > > > > > and should not become a dumping ground for advanced use > cases. > > >> I'll > > >> > > > > update > > >> > > > > > the KIP with the `findOffsets` API suggested above and we > can > > >> see > > >> > if > > >> > > > it > > >> > > > > > does a good enough job of keeping the API simple for common > > >> cases. > > >> > > > > > > > >> > > > > > Thanks, > > >> > > > > > Jason > > >> > > > > > > > >> > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, Dong Lin < > > lindon...@gmail.com> > > >> > > wrote: > > >> > > > > > > > >> > > > > > > Hey Jason, > > >> > > > > > > > > >> > > > > > > Regarding seek(...), it seems that we want an API for user > > to > > >> > > > > initialize > > >> > > > > > > consumer with (offset, leaderEpoch) and that API should > > allow > > >> > > > throwing > > >> > > > > > > PartitionTruncationException. Suppose we agree on this, > then > > >> > > > > > > seekToNearest() is not sufficient because it will always > > >> swallow > > >> > > > > > > PartitionTruncationException. Here we have two options. > The > > >> first > > >> > > > > option > > >> > > > > > is > > >> > > > > > > to add API offsetsForLeaderEpochs() to translate > > (leaderEpoch, > > >> > > > offset) > > >> > > > > to > > >> > > > > > > offset. The second option is to have add seek(offset, > > >> > leaderEpoch). > > >> > > > It > > >> > > > > > > seems that second option may be more simpler because it > > makes > > >> it > > >> > > > clear > > >> > > > > > that > > >> > > > > > > (offset, leaderEpoch) will be used to identify consumer's > > >> > position > > >> > > > in a > > >> > > > > > > partition. And user only needs to handle > > >> > > PartitionTruncationException > > >> > > > > > from > > >> > > > > > > the poll(). In comparison the first option seems a bit > > harder > > >> to > > >> > > use > > >> > > > > > > because user have to also handle the > > >> PartitionTruncationException > > >> > > if > > >> > > > > > > offsetsForLeaderEpochs() returns different offset from > > >> > > user-provided > > >> > > > > > > offset. What do you think? > > >> > > > > > > > > >> > > > > > > If we decide to add API seek(offset, leaderEpoch), then we > > can > > >> > > decide > > >> > > > > > > whether and how to add API to translate (offset, > > leaderEpoch) > > >> to > > >> > > > > offset. > > >> > > > > > It > > >> > > > > > > seems that this API will be needed by advanced user to > don't > > >> want > > >> > > > auto > > >> > > > > > > offset reset (so that it can be notified) but still wants > to > > >> > reset > > >> > > > > offset > > >> > > > > > > to closest. For those users if probably makes sense to > only > > >> have > > >> > > the > > >> > > > > API > > >> > > > > > in > > >> > > > > > > AdminClient. offsetsForTimes() seems like a common API > that > > >> will > > >> > be > > >> > > > > > needed > > >> > > > > > > by user's of consumer in general, so it may be more > > >> reasonable to > > >> > > > stay > > >> > > > > in > > >> > > > > > > the consumer API. I don't have a strong opinion on whether > > >> > > > > > > offsetsForTimes() should be replaced by API in > AdminClient. > > >> > > > > > > > > >> > > > > > > Though (offset, leaderEpoch) is needed to uniquely > identify > > a > > >> > > message > > >> > > > > in > > >> > > > > > > general, it is only needed for advanced users who has > turned > > >> on > > >> > > > unclean > > >> > > > > > > leader election, need to use seek(..), and don't want auto > > >> offset > > >> > > > > reset. > > >> > > > > > > Most other users probably just want to enable auto offset > > >> reset > > >> > and > > >> > > > > store > > >> > > > > > > offset in Kafka. Thus we might want to keep the existing > > >> > > offset-only > > >> > > > > APIs > > >> > > > > > > (e.g. seek() and position()) for most users while adding > new > > >> APIs > > >> > > for > > >> > > > > > > advanced users. And yes, it seems that we need new name > for > > >> > > > position(). > > >> > > > > > > > > >> > > > > > > Though I think we need new APIs to carry the new > information > > >> > (e.g. > > >> > > > > > > leaderEpoch), I am not very sure how that should look > like. > > >> One > > >> > > > > possible > > >> > > > > > > option is those APIs in KIP-232. Another option is > something > > >> like > > >> > > > this: > > >> > > > > > > > > >> > > > > > > ````` > > >> > > > > > > class OffsetEpochs { > > >> > > > > > > long offset; > > >> > > > > > > int leaderEpoch; > > >> > > > > > > int partitionEpoch; // This may be needed later as > > >> discussed > > >> > in > > >> > > > > > KIP-232 > > >> > > > > > > ... // Hopefully these are all we need to identify > message > > >> in > > >> > > > Kafka. > > >> > > > > > But > > >> > > > > > > if we need more then we can add new fields in this class. > > >> > > > > > > } > > >> > > > > > > > > >> > > > > > > OffsetEpochs offsetEpochs(TopicPartition); > > >> > > > > > > > > >> > > > > > > void seek(TopicPartition, OffsetEpochs); > > >> > > > > > > `````` > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Dong > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 AM, Jason Gustafson < > > >> > > > ja...@confluent.io> > > >> > > > > > > wrote: > > >> > > > > > > > > >> > > > > > > > Hey Dong, > > >> > > > > > > > > > >> > > > > > > > Thanks for the feedback. The first three points are > easy: > > >> > > > > > > > > > >> > > > > > > > 1. Yes, we should be consistent. > > >> > > > > > > > 2. Yes, I will add this. > > >> > > > > > > > 3. Yes, I think we should document the changes to the > > >> committed > > >> > > > > offset > > >> > > > > > > > schema. I meant to do this, but it slipped my mind. > > >> > > > > > > > > > >> > > > > > > > The latter questions are tougher. One option I was > > >> considering > > >> > is > > >> > > > to > > >> > > > > > have > > >> > > > > > > > only `offsetsForLeaderEpochs` exposed from the consumer > > and > > >> to > > >> > > drop > > >> > > > > the > > >> > > > > > > new > > >> > > > > > > > seek() API. That seems more consistent with the current > > use > > >> of > > >> > > > > > > > `offsetsForTimes` (we don't have a separate > > >> `seekToTimestamp` > > >> > > API). > > >> > > > > An > > >> > > > > > > > alternative might be to take a page from the AdminClient > > API > > >> > and > > >> > > > add > > >> > > > > a > > >> > > > > > > new > > >> > > > > > > > method to generalize offset lookup. For example, we > could > > >> have > > >> > > > > > > > `lookupOffsets(LookupOptions)`. We could then deprecate > > >> > > > > > > `offsetsForTimes` > > >> > > > > > > > and this would open the door for future extensions > without > > >> > > needing > > >> > > > > new > > >> > > > > > > > APIs. > > >> > > > > > > > > > >> > > > > > > > The case of position() is a little more annoying. It > would > > >> have > > >> > > > been > > >> > > > > > > better > > >> > > > > > > > had we let this return an object so that it is easier to > > >> > extend. > > >> > > > This > > >> > > > > > is > > >> > > > > > > > the only reason I didn't add the API to the KIP. Maybe > we > > >> > should > > >> > > > bite > > >> > > > > > the > > >> > > > > > > > bullet and fix this now? Unfortunately we'll have to > come > > up > > >> > > with a > > >> > > > > new > > >> > > > > > > > name. Maybe `currentPosition`? > > >> > > > > > > > > > >> > > > > > > > Thoughts? > > >> > > > > > > > > > >> > > > > > > > -Jason > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin < > > >> > lindon...@gmail.com> > > >> > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > Regarding points 4) and 5) above, motivation for the > > >> > > alternative > > >> > > > > APIs > > >> > > > > > > is > > >> > > > > > > > > that, if we decide that leaderEpoch is equally > important > > >> as > > >> > > > offset > > >> > > > > in > > >> > > > > > > > > identifying a message, then it may be reasonable to > > always > > >> > > > specify > > >> > > > > it > > >> > > > > > > > > wherever offset is currently required in the consumer > > API > > >> to > > >> > > > > > identify a > > >> > > > > > > > > message, e.g. position(), seek(). For example, since > we > > >> allow > > >> > > > user > > >> > > > > to > > >> > > > > > > > > retrieve offset using position() instead of asking > user > > to > > >> > keep > > >> > > > > track > > >> > > > > > > of > > >> > > > > > > > > the offset of the latest ConsumerRecord, may be it > will > > be > > >> > more > > >> > > > > > > > consistent > > >> > > > > > > > > for user to also retrieve leaderEpoch using > position()? > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin < > > >> > > lindon...@gmail.com> > > >> > > > > > > wrote: > > >> > > > > > > > > > > >> > > > > > > > > > Hey Jason, > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks for the update. It looks pretty good. Just > some > > >> > minor > > >> > > > > > comments > > >> > > > > > > > > > below: > > >> > > > > > > > > > > > >> > > > > > > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and > > new > > >> > > > exception > > >> > > > > > > > > TruncatedPartitionException. > > >> > > > > > > > > > Can we make the name more consistent, e.g. > > >> > > > > LogTruncationException? > > >> > > > > > > > > > > > >> > > > > > > > > > 2) Do we need to add UnknownLeaderEpochException as > > >> part of > > >> > > API > > >> > > > > > > change? > > >> > > > > > > > > > > > >> > > > > > > > > > 3) Not sure if the offset topic schema is also > public > > >> API. > > >> > If > > >> > > > so, > > >> > > > > > > maybe > > >> > > > > > > > > we > > >> > > > > > > > > > should also include the schema change in the API? > > >> > > > > > > > > > > > >> > > > > > > > > > 4) For users who store offset externally, currently > > they > > >> > get > > >> > > > > offset > > >> > > > > > > > using > > >> > > > > > > > > > position(..), store the offset externally, and use > > >> seek(..) > > >> > > to > > >> > > > > > > > initialize > > >> > > > > > > > > > the consumer next time. After this KIP they will > need > > to > > >> > > store > > >> > > > > and > > >> > > > > > > use > > >> > > > > > > > > the > > >> > > > > > > > > > leaderEpoch together with the offset. Should we also > > >> update > > >> > > the > > >> > > > > API > > >> > > > > > > so > > >> > > > > > > > > that > > >> > > > > > > > > > user can also get leaderEpoch from position(...)? > Not > > >> sure > > >> > if > > >> > > > it > > >> > > > > is > > >> > > > > > > OK > > >> > > > > > > > to > > >> > > > > > > > > > ask user to track the latest leaderEpoch of > > >> ConsumerRecord > > >> > by > > >> > > > > > > > themselves. > > >> > > > > > > > > > > > >> > > > > > > > > > 5) Also for users who store offset externally, they > > >> need to > > >> > > > call > > >> > > > > > > > seek(..) > > >> > > > > > > > > > with leaderEpoch to initialize consumer. With > current > > >> KIP > > >> > > users > > >> > > > > > need > > >> > > > > > > to > > >> > > > > > > > > > call seekToNearest(), whose name suggests that the > > final > > >> > > > position > > >> > > > > > may > > >> > > > > > > > be > > >> > > > > > > > > > different from what was requested. However, if users > > may > > >> > want > > >> > > > to > > >> > > > > > > avoid > > >> > > > > > > > > auto > > >> > > > > > > > > > offset reset and be notified explicitly when there > is > > >> log > > >> > > > > > truncation, > > >> > > > > > > > > then seekToNearest() > > >> > > > > > > > > > probably does not help here. Would it make sense to > > >> replace > > >> > > > > > > > > seekToNearest() > > >> > > > > > > > > > with seek(offset, leaderEpoch) + AminClient. > > >> > > > > > > > offsetsForLeaderEpochs(...)? > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks, > > >> > > > > > > > > > Dong > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson < > > >> > > > > > ja...@confluent.io > > >> > > > > > > > > > >> > > > > > > > > > wrote: > > >> > > > > > > > > > > > >> > > > > > > > > >> Hey Guozhang, > > >> > > > > > > > > >> > > >> > > > > > > > > >> That's fair. In fact, perhaps we do not need this > API > > >> at > > >> > > all. > > >> > > > We > > >> > > > > > > > already > > >> > > > > > > > > >> have the new seek() in this KIP which can do the > > lookup > > >> > > based > > >> > > > on > > >> > > > > > > epoch > > >> > > > > > > > > for > > >> > > > > > > > > >> this use case. I guess we should probably call it > > >> > > > > seekToNearest() > > >> > > > > > > > though > > >> > > > > > > > > >> to > > >> > > > > > > > > >> make it clear that the final position may be > > different > > >> > from > > >> > > > what > > >> > > > > > was > > >> > > > > > > > > >> requested. > > >> > > > > > > > > >> > > >> > > > > > > > > >> Thanks, > > >> > > > > > > > > >> Jason > > >> > > > > > > > > >> > > >> > > > > > > > > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang < > > >> > > > > > wangg...@gmail.com> > > >> > > > > > > > > >> wrote: > > >> > > > > > > > > >> > > >> > > > > > > > > >> > Hi Jason, > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > I think it is less worthwhile to add > > >> > > > > > > KafkaConsumer#offsetsForLeader > > >> > > > > > > > > >> Epochs, > > >> > > > > > > > > >> > since probably only very advanced users are aware > > of > > >> the > > >> > > > > > > > leaderEpoch, > > >> > > > > > > > > >> and > > >> > > > > > > > > >> > hence ever care to use it anyways. It is more > like > > an > > >> > > admin > > >> > > > > > client > > >> > > > > > > > > >> > operation than a consumer client operation: if > the > > >> > > > motivation > > >> > > > > is > > >> > > > > > > to > > >> > > > > > > > > >> > facility customized reset policy, maybe adding it > > as > > >> > > > > > > > > >> > AdminClient#offsetsForLeaderEpochs > > >> > > > > > > > > >> > is better as it is not an aggressive assumption > > that > > >> for > > >> > > > such > > >> > > > > > > > advanced > > >> > > > > > > > > >> > users they are willing to use some admin client > to > > >> get > > >> > > > further > > >> > > > > > > > > >> information? > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > Guozhang > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson > < > > >> > > > > > > > ja...@confluent.io> > > >> > > > > > > > > >> > wrote: > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > Thanks for the feedback. I've updated the KIP. > > >> > > > Specifically > > >> > > > > I > > >> > > > > > > > > removed > > >> > > > > > > > > >> the > > >> > > > > > > > > >> > > "closest" reset option and the proposal to > reset > > by > > >> > > > > timestamp > > >> > > > > > > when > > >> > > > > > > > > the > > >> > > > > > > > > >> > > precise truncation point cannot be determined. > > >> > Instead, > > >> > > I > > >> > > > > > > proposed > > >> > > > > > > > > >> that > > >> > > > > > > > > >> > we > > >> > > > > > > > > >> > > always reset using the nearest epoch when a > reset > > >> > policy > > >> > > > is > > >> > > > > > > > defined > > >> > > > > > > > > >> > (either > > >> > > > > > > > > >> > > "earliest" or "latest"). Does that sound > > >> reasonable? > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > One thing I am still debating is whether it > would > > >> be > > >> > > > better > > >> > > > > to > > >> > > > > > > > have > > >> > > > > > > > > a > > >> > > > > > > > > >> > > separate API to find the closest offset using > the > > >> > leader > > >> > > > > > epoch. > > >> > > > > > > In > > >> > > > > > > > > the > > >> > > > > > > > > >> > > current KIP, I suggested to piggyback this > > >> information > > >> > > on > > >> > > > an > > >> > > > > > > > > >> exception, > > >> > > > > > > > > >> > but > > >> > > > > > > > > >> > > I'm beginning to think it would be better not > to > > >> hide > > >> > > the > > >> > > > > > > lookup. > > >> > > > > > > > It > > >> > > > > > > > > >> is > > >> > > > > > > > > >> > > awkward to implement since it means delaying > the > > >> > > exception > > >> > > > > and > > >> > > > > > > the > > >> > > > > > > > > API > > >> > > > > > > > > >> > may > > >> > > > > > > > > >> > > actually be useful when customizing reset logic > > if > > >> no > > >> > > auto > > >> > > > > > reset > > >> > > > > > > > > >> policy > > >> > > > > > > > > >> > is > > >> > > > > > > > > >> > > defined. I was thinking we can add an API like > > the > > >> > > > > following: > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > Map<TopicPartition, OffsetAndEpoch> > > >> > > > > > > > > >> > > offsetsForLeaderEpochs(Map<TopicPartition, > > Integer> > > >> > > > > > > > epochsToSearch) > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > Thoughts? > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > -Jason > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason > > Gustafson < > > >> > > > > > > > > ja...@confluent.io > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > wrote: > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > @Dong > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > Those are fair points. Both approaches > require > > >> some > > >> > > > > > fuzziness > > >> > > > > > > to > > >> > > > > > > > > >> reset > > >> > > > > > > > > >> > > the > > >> > > > > > > > > >> > > > offset in these pathological scenarios and we > > >> cannot > > >> > > > > > guarantee > > >> > > > > > > > > >> > > > at-least-once delivery either way unless we > > have > > >> the > > >> > > > full > > >> > > > > > > > history > > >> > > > > > > > > of > > >> > > > > > > > > >> > > leader > > >> > > > > > > > > >> > > > epochs that were consumed. The KIP-101 logic > > may > > >> > > > actually > > >> > > > > be > > >> > > > > > > > more > > >> > > > > > > > > >> > > accurate > > >> > > > > > > > > >> > > > than using timestamps because it does not > > depend > > >> on > > >> > > the > > >> > > > > > > messages > > >> > > > > > > > > >> which > > >> > > > > > > > > >> > > are > > >> > > > > > > > > >> > > > written after the unclean leader election. > The > > >> case > > >> > > > we're > > >> > > > > > > > talking > > >> > > > > > > > > >> about > > >> > > > > > > > > >> > > > should be extremely rare in practice anyway. > I > > >> also > > >> > > > agree > > >> > > > > > that > > >> > > > > > > > we > > >> > > > > > > > > >> may > > >> > > > > > > > > >> > not > > >> > > > > > > > > >> > > > want to add new machinery if it only helps > the > > >> old > > >> > > > message > > >> > > > > > > > format. > > >> > > > > > > > > >> Ok, > > >> > > > > > > > > >> > > > let's go ahead and drop the timestamp. > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > @Guozhang > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > * My current understanding is that, with > > unclean > > >> > > leader > > >> > > > > > > election > > >> > > > > > > > > >> turned > > >> > > > > > > > > >> > > on, > > >> > > > > > > > > >> > > >> exactly-once is out of the window since we > > >> cannot > > >> > > > > guarantee > > >> > > > > > > > that > > >> > > > > > > > > >> all > > >> > > > > > > > > >> > > >> committed message markers will not be lost. > > And > > >> > hence > > >> > > > > there > > >> > > > > > > is > > >> > > > > > > > no > > >> > > > > > > > > >> need > > >> > > > > > > > > >> > > to > > >> > > > > > > > > >> > > >> have special handling logic for > LOG_TRUNCATED > > or > > >> > OOR > > >> > > > > error > > >> > > > > > > > codes > > >> > > > > > > > > >> with > > >> > > > > > > > > >> > > >> read.committed turned on. Is that right? > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > Yes, that's right. EoS and unclean leader > > >> election > > >> > > don't > > >> > > > > mix > > >> > > > > > > > well. > > >> > > > > > > > > >> It > > >> > > > > > > > > >> > may > > >> > > > > > > > > >> > > > be worth considering separately whether we > > should > > >> > try > > >> > > to > > >> > > > > > > > reconcile > > >> > > > > > > > > >> the > > >> > > > > > > > > >> > > > transaction log following an unclean leader > > >> > election. > > >> > > At > > >> > > > > > least > > >> > > > > > > > we > > >> > > > > > > > > >> may > > >> > > > > > > > > >> > be > > >> > > > > > > > > >> > > > able to prevent dangling transactions from > > >> blocking > > >> > > > > > consumers. > > >> > > > > > > > > This > > >> > > > > > > > > >> KIP > > >> > > > > > > > > >> > > > does not address this problem. > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > * MINOR: "if the epoch is greater than the > > >> minimum > > >> > > > > expected > > >> > > > > > > > epoch, > > >> > > > > > > > > >> that > > >> > > > > > > > > >> > > the > > >> > > > > > > > > >> > > >> new epoch does not begin at an earlier > offset > > >> than > > >> > > the > > >> > > > > > fetch > > >> > > > > > > > > >> offset. > > >> > > > > > > > > >> > In > > >> > > > > > > > > >> > > >> the latter case, the leader can respond > with a > > >> new > > >> > > > > > > > LOG_TRUNCATION > > >> > > > > > > > > >> > error > > >> > > > > > > > > >> > > >> code" should it be "does not begin at a > later > > >> > offset > > >> > > > than > > >> > > > > > the > > >> > > > > > > > > fetch > > >> > > > > > > > > >> > > >> offset"? > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > I think the comment is correct, though the > > >> phrasing > > >> > > may > > >> > > > be > > >> > > > > > > > > >> confusing. > > >> > > > > > > > > >> > We > > >> > > > > > > > > >> > > > know truncation has occurred if there exists > a > > >> > larger > > >> > > > > epoch > > >> > > > > > > > with a > > >> > > > > > > > > >> > > starting > > >> > > > > > > > > >> > > > offset that is lower than the fetch offset. > Let > > >> me > > >> > try > > >> > > > to > > >> > > > > > > > rephrase > > >> > > > > > > > > >> > this. > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > Thanks, > > >> > > > > > > > > >> > > > Jason > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang > Wang > > < > > >> > > > > > > > > wangg...@gmail.com> > > >> > > > > > > > > >> > > wrote: > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > >> Jason, thanks for the KIP. A few comments: > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> * I think Dong's question about whether to > use > > >> > > > > > > timestamp-based > > >> > > > > > > > > >> > approach > > >> > > > > > > > > >> > > >> v.s. start-offset-of-first-larger-epoch is > > >> valid; > > >> > > more > > >> > > > > > > > > >> specifically, > > >> > > > > > > > > >> > > with > > >> > > > > > > > > >> > > >> timestamp-based approach we may still be > > >> reseting > > >> > to > > >> > > an > > >> > > > > > > offset > > >> > > > > > > > > >> falling > > >> > > > > > > > > >> > > >> into > > >> > > > > > > > > >> > > >> the truncated interval, and hence we may > still > > >> miss > > >> > > > some > > >> > > > > > > data, > > >> > > > > > > > > i.e. > > >> > > > > > > > > >> > not > > >> > > > > > > > > >> > > >> guaranteeing at-least-once still. With the > > >> > > > > > > > > >> > > >> start-offset-of-first-larger-epoch, I'm not > > sure > > >> > if > > >> > > it > > >> > > > > > will > > >> > > > > > > > > >> guarantee > > >> > > > > > > > > >> > > no > > >> > > > > > > > > >> > > >> valid data is missed when we have > consecutive > > >> log > > >> > > > > > truncations > > >> > > > > > > > > >> (maybe > > >> > > > > > > > > >> > we > > >> > > > > > > > > >> > > >> need to look back into details of KIP-101 to > > >> figure > > >> > > it > > >> > > > > > out). > > >> > > > > > > If > > >> > > > > > > > > the > > >> > > > > > > > > >> > > latter > > >> > > > > > > > > >> > > >> can indeed guarantee at least once, we could > > >> > consider > > >> > > > > using > > >> > > > > > > > that > > >> > > > > > > > > >> > > approach. > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> * My current understanding is that, with > > unclean > > >> > > leader > > >> > > > > > > > election > > >> > > > > > > > > >> > turned > > >> > > > > > > > > >> > > >> on, > > >> > > > > > > > > >> > > >> exactly-once is out of the window since we > > >> cannot > > >> > > > > guarantee > > >> > > > > > > > that > > >> > > > > > > > > >> all > > >> > > > > > > > > >> > > >> committed message markers will not be lost. > > And > > >> > hence > > >> > > > > there > > >> > > > > > > is > > >> > > > > > > > no > > >> > > > > > > > > >> need > > >> > > > > > > > > >> > > to > > >> > > > > > > > > >> > > >> have special handling logic for > LOG_TRUNCATED > > or > > >> > OOR > > >> > > > > error > > >> > > > > > > > codes > > >> > > > > > > > > >> with > > >> > > > > > > > > >> > > >> read.committed turned on. Is that right? > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> * MINOR: "if the epoch is greater than the > > >> minimum > > >> > > > > expected > > >> > > > > > > > > epoch, > > >> > > > > > > > > >> > that > > >> > > > > > > > > >> > > >> the > > >> > > > > > > > > >> > > >> new epoch does not begin at an earlier > offset > > >> than > > >> > > the > > >> > > > > > fetch > > >> > > > > > > > > >> offset. > > >> > > > > > > > > >> > In > > >> > > > > > > > > >> > > >> the latter case, the leader can respond > with a > > >> new > > >> > > > > > > > LOG_TRUNCATION > > >> > > > > > > > > >> > error > > >> > > > > > > > > >> > > >> code" should it be "does not begin at a > later > > >> > offset > > >> > > > than > > >> > > > > > the > > >> > > > > > > > > fetch > > >> > > > > > > > > >> > > >> offset"? > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> Guozhang > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin < > > >> > > > > > > lindon...@gmail.com > > >> > > > > > > > > > > >> > > > > > > > > >> > wrote: > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > Hey Jason, > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Thanks for the explanation. > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Please correct me if this is wrong. The > > >> "unknown > > >> > > > > > truncation > > >> > > > > > > > > >> offset" > > >> > > > > > > > > >> > > >> > scenario happens when consumer does not > have > > >> the > > >> > > full > > >> > > > > > > > > >> leaderEpoch -> > > >> > > > > > > > > >> > > >> offset > > >> > > > > > > > > >> > > >> > mapping. In this case we can still use the > > >> > > > > KIP-101-based > > >> > > > > > > > > >> approach to > > >> > > > > > > > > >> > > >> > truncate offset to "start offset of the > > first > > >> > > Leader > > >> > > > > > Epoch > > >> > > > > > > > > larger > > >> > > > > > > > > >> > than > > >> > > > > > > > > >> > > >> last > > >> > > > > > > > > >> > > >> > epoch of the consumer" but it may be > > >> inaccurate. > > >> > So > > >> > > > the > > >> > > > > > KIP > > >> > > > > > > > > >> chooses > > >> > > > > > > > > >> > to > > >> > > > > > > > > >> > > >> use > > >> > > > > > > > > >> > > >> > the timestamp-based approach which is also > > >> > > > best-effort. > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > If this understanding is correct, for > > >> "closest" > > >> > > > offset > > >> > > > > > > reset > > >> > > > > > > > > >> policy > > >> > > > > > > > > >> > > and > > >> > > > > > > > > >> > > >> > "unknown truncation offset" scenario, I am > > >> > > wondering > > >> > > > > > > whether > > >> > > > > > > > it > > >> > > > > > > > > >> > maybe > > >> > > > > > > > > >> > > >> > better to replace timestamp-based approach > > >> with > > >> > > > KIP-101 > > >> > > > > > > based > > >> > > > > > > > > >> > > approach. > > >> > > > > > > > > >> > > >> In > > >> > > > > > > > > >> > > >> > comparison to timestamp-based approach, > the > > >> > > > > KIP-101-based > > >> > > > > > > > > >> approach > > >> > > > > > > > > >> > > >> seems to > > >> > > > > > > > > >> > > >> > simplify the API a bit since user does not > > >> need > > >> > to > > >> > > > > > > understand > > >> > > > > > > > > >> > > timestamp. > > >> > > > > > > > > >> > > >> > Similar to the timestamp-based approach, > > both > > >> > > > > approaches > > >> > > > > > > are > > >> > > > > > > > > >> > > best-effort > > >> > > > > > > > > >> > > >> > and do not guarantee that consumer can > > consume > > >> > all > > >> > > > > > > messages. > > >> > > > > > > > It > > >> > > > > > > > > >> is > > >> > > > > > > > > >> > not > > >> > > > > > > > > >> > > >> like > > >> > > > > > > > > >> > > >> > KIP-279 which guarantees that follower > > broker > > >> can > > >> > > > > consume > > >> > > > > > > all > > >> > > > > > > > > >> > messages > > >> > > > > > > > > >> > > >> from > > >> > > > > > > > > >> > > >> > the leader. > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Then it seems that the remaining > difference > > is > > >> > > mostly > > >> > > > > > about > > >> > > > > > > > > >> > accuracy, > > >> > > > > > > > > >> > > >> i.e. > > >> > > > > > > > > >> > > >> > how much message will be duplicated or > > missed > > >> in > > >> > > the > > >> > > > > > > "unknown > > >> > > > > > > > > >> > > truncation > > >> > > > > > > > > >> > > >> > offset" scenario. Not sure either one is > > >> clearly > > >> > > > better > > >> > > > > > > than > > >> > > > > > > > > the > > >> > > > > > > > > >> > > other. > > >> > > > > > > > > >> > > >> > Note that there are two scenarios > mentioned > > in > > >> > > > KIP-279 > > >> > > > > > > which > > >> > > > > > > > > are > > >> > > > > > > > > >> not > > >> > > > > > > > > >> > > >> > addressed by KIP-101. Both scenarios > require > > >> > quick > > >> > > > > > > leadership > > >> > > > > > > > > >> change > > >> > > > > > > > > >> > > >> > between brokers, which seems to suggest > that > > >> the > > >> > > > offset > > >> > > > > > > based > > >> > > > > > > > > >> > obtained > > >> > > > > > > > > >> > > >> > by "start > > >> > > > > > > > > >> > > >> > offset of the first Leader Epoch larger > than > > >> last > > >> > > > epoch > > >> > > > > > of > > >> > > > > > > > the > > >> > > > > > > > > >> > > consumer" > > >> > > > > > > > > >> > > >> > under these two scenarios may be very > close > > to > > >> > the > > >> > > > > offset > > >> > > > > > > > > >> obtained > > >> > > > > > > > > >> > by > > >> > > > > > > > > >> > > >> the > > >> > > > > > > > > >> > > >> > message timestamp. Does this sound > > reasonable? > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Good point that users on v1 format can get > > >> > benefit > > >> > > > with > > >> > > > > > > > > timestamp > > >> > > > > > > > > >> > > based > > >> > > > > > > > > >> > > >> > approach. On the other hand it seems like > a > > >> short > > >> > > > term > > >> > > > > > > > benefit > > >> > > > > > > > > >> for > > >> > > > > > > > > >> > > users > > >> > > > > > > > > >> > > >> > who have not migrated. I am just not sure > > >> whether > > >> > > it > > >> > > > is > > >> > > > > > > more > > >> > > > > > > > > >> > important > > >> > > > > > > > > >> > > >> than > > >> > > > > > > > > >> > > >> > designing a better API. > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Also, for both "latest" and "earliest" > reset > > >> > > policy, > > >> > > > do > > >> > > > > > you > > >> > > > > > > > > >> think it > > >> > > > > > > > > >> > > >> would > > >> > > > > > > > > >> > > >> > make sense to also use the KIP-101 based > > >> approach > > >> > > to > > >> > > > > > > truncate > > >> > > > > > > > > >> offset > > >> > > > > > > > > >> > > for > > >> > > > > > > > > >> > > >> > the "unknown truncation offset" scenario? > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > Thanks, > > >> > > > > > > > > >> > > >> > Dong > > >> > > > > > > > > >> > > >> > > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > >> -- > > >> > > > > > > > > >> > > >> -- Guozhang > > >> > > > > > > > > >> > > >> > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > -- > > >> > > > > > > > > >> > -- Guozhang > > >> > > > > > > > > >> > > > >> > > > > > > > > >> > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > >