Hey Dong, Thanks for the feedback. The first three points are easy:
1. Yes, we should be consistent. 2. Yes, I will add this. 3. Yes, I think we should document the changes to the committed offset schema. I meant to do this, but it slipped my mind. The latter questions are tougher. One option I was considering is to have only `offsetsForLeaderEpochs` exposed from the consumer and to drop the new seek() API. That seems more consistent with the current use of `offsetsForTimes` (we don't have a separate `seekToTimestamp` API). An alternative might be to take a page from the AdminClient API and add a new method to generalize offset lookup. For example, we could have `lookupOffsets(LookupOptions)`. We could then deprecate `offsetsForTimes` and this would open the door for future extensions without needing new APIs. The case of position() is a little more annoying. It would have been better had we let this return an object so that it is easier to extend. This is the only reason I didn't add the API to the KIP. Maybe we should bite the bullet and fix this now? Unfortunately we'll have to come up with a new name. Maybe `currentPosition`? Thoughts? -Jason On Fri, Jun 29, 2018 at 10:40 AM, Dong Lin <lindon...@gmail.com> wrote: > Regarding points 4) and 5) above, motivation for the alternative APIs is > that, if we decide that leaderEpoch is equally important as offset in > identifying a message, then it may be reasonable to always specify it > wherever offset is currently required in the consumer API to identify a > message, e.g. position(), seek(). For example, since we allow user to > retrieve offset using position() instead of asking user to keep track of > the offset of the latest ConsumerRecord, may be it will be more consistent > for user to also retrieve leaderEpoch using position()? > > > > On Fri, Jun 29, 2018 at 10:30 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jason, > > > > Thanks for the update. It looks pretty good. Just some minor comments > > below: > > > > 1) The KIP adds new error code "LOG_TRUNCATION" and new exception > TruncatedPartitionException. > > Can we make the name more consistent, e.g. LogTruncationException? > > > > 2) Do we need to add UnknownLeaderEpochException as part of API change? > > > > 3) Not sure if the offset topic schema is also public API. If so, maybe > we > > should also include the schema change in the API? > > > > 4) For users who store offset externally, currently they get offset using > > position(..), store the offset externally, and use seek(..) to initialize > > the consumer next time. After this KIP they will need to store and use > the > > leaderEpoch together with the offset. Should we also update the API so > that > > user can also get leaderEpoch from position(...)? Not sure if it is OK to > > ask user to track the latest leaderEpoch of ConsumerRecord by themselves. > > > > 5) Also for users who store offset externally, they need to call seek(..) > > with leaderEpoch to initialize consumer. With current KIP users need to > > call seekToNearest(), whose name suggests that the final position may be > > different from what was requested. However, if users may want to avoid > auto > > offset reset and be notified explicitly when there is log truncation, > then seekToNearest() > > probably does not help here. Would it make sense to replace > seekToNearest() > > with seek(offset, leaderEpoch) + AminClient.offsetsForLeaderEpochs(...)? > > > > > > Thanks, > > Dong > > > > > > On Wed, Jun 27, 2018 at 3:57 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > >> Hey Guozhang, > >> > >> That's fair. In fact, perhaps we do not need this API at all. We already > >> have the new seek() in this KIP which can do the lookup based on epoch > for > >> this use case. I guess we should probably call it seekToNearest() though > >> to > >> make it clear that the final position may be different from what was > >> requested. > >> > >> Thanks, > >> Jason > >> > >> On Wed, Jun 27, 2018 at 3:20 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >> > Hi Jason, > >> > > >> > I think it is less worthwhile to add KafkaConsumer#offsetsForLeader > >> Epochs, > >> > since probably only very advanced users are aware of the leaderEpoch, > >> and > >> > hence ever care to use it anyways. It is more like an admin client > >> > operation than a consumer client operation: if the motivation is to > >> > facility customized reset policy, maybe adding it as > >> > AdminClient#offsetsForLeaderEpochs > >> > is better as it is not an aggressive assumption that for such advanced > >> > users they are willing to use some admin client to get further > >> information? > >> > > >> > > >> > Guozhang > >> > > >> > > >> > On Wed, Jun 27, 2018 at 2:07 PM, Jason Gustafson <ja...@confluent.io> > >> > wrote: > >> > > >> > > Thanks for the feedback. I've updated the KIP. Specifically I > removed > >> the > >> > > "closest" reset option and the proposal to reset by timestamp when > the > >> > > precise truncation point cannot be determined. Instead, I proposed > >> that > >> > we > >> > > always reset using the nearest epoch when a reset policy is defined > >> > (either > >> > > "earliest" or "latest"). Does that sound reasonable? > >> > > > >> > > One thing I am still debating is whether it would be better to have > a > >> > > separate API to find the closest offset using the leader epoch. In > the > >> > > current KIP, I suggested to piggyback this information on an > >> exception, > >> > but > >> > > I'm beginning to think it would be better not to hide the lookup. It > >> is > >> > > awkward to implement since it means delaying the exception and the > API > >> > may > >> > > actually be useful when customizing reset logic if no auto reset > >> policy > >> > is > >> > > defined. I was thinking we can add an API like the following: > >> > > > >> > > Map<TopicPartition, OffsetAndEpoch> > >> > > offsetsForLeaderEpochs(Map<TopicPartition, Integer> epochsToSearch) > >> > > > >> > > Thoughts? > >> > > > >> > > -Jason > >> > > > >> > > > >> > > > >> > > > >> > > On Wed, Jun 27, 2018 at 11:12 AM, Jason Gustafson < > ja...@confluent.io > >> > > >> > > wrote: > >> > > > >> > > > @Dong > >> > > > > >> > > > Those are fair points. Both approaches require some fuzziness to > >> reset > >> > > the > >> > > > offset in these pathological scenarios and we cannot guarantee > >> > > > at-least-once delivery either way unless we have the full history > of > >> > > leader > >> > > > epochs that were consumed. The KIP-101 logic may actually be more > >> > > accurate > >> > > > than using timestamps because it does not depend on the messages > >> which > >> > > are > >> > > > written after the unclean leader election. The case we're talking > >> about > >> > > > should be extremely rare in practice anyway. I also agree that we > >> may > >> > not > >> > > > want to add new machinery if it only helps the old message format. > >> Ok, > >> > > > let's go ahead and drop the timestamp. > >> > > > > >> > > > @Guozhang > >> > > > > >> > > > * My current understanding is that, with unclean leader election > >> turned > >> > > on, > >> > > >> exactly-once is out of the window since we cannot guarantee that > >> all > >> > > >> committed message markers will not be lost. And hence there is no > >> need > >> > > to > >> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes > >> with > >> > > >> read.committed turned on. Is that right? > >> > > > > >> > > > > >> > > > Yes, that's right. EoS and unclean leader election don't mix well. > >> It > >> > may > >> > > > be worth considering separately whether we should try to reconcile > >> the > >> > > > transaction log following an unclean leader election. At least we > >> may > >> > be > >> > > > able to prevent dangling transactions from blocking consumers. > This > >> KIP > >> > > > does not address this problem. > >> > > > > >> > > > * MINOR: "if the epoch is greater than the minimum expected epoch, > >> that > >> > > the > >> > > >> new epoch does not begin at an earlier offset than the fetch > >> offset. > >> > In > >> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION > >> > error > >> > > >> code" should it be "does not begin at a later offset than the > fetch > >> > > >> offset"? > >> > > > > >> > > > > >> > > > I think the comment is correct, though the phrasing may be > >> confusing. > >> > We > >> > > > know truncation has occurred if there exists a larger epoch with a > >> > > starting > >> > > > offset that is lower than the fetch offset. Let me try to rephrase > >> > this. > >> > > > > >> > > > Thanks, > >> > > > Jason > >> > > > > >> > > > On Wed, Jun 27, 2018 at 9:17 AM, Guozhang Wang < > wangg...@gmail.com> > >> > > wrote: > >> > > > > >> > > >> Jason, thanks for the KIP. A few comments: > >> > > >> > >> > > >> * I think Dong's question about whether to use timestamp-based > >> > approach > >> > > >> v.s. start-offset-of-first-larger-epoch is valid; more > >> specifically, > >> > > with > >> > > >> timestamp-based approach we may still be reseting to an offset > >> falling > >> > > >> into > >> > > >> the truncated interval, and hence we may still miss some data, > i.e. > >> > not > >> > > >> guaranteeing at-least-once still. With the > >> > > >> start-offset-of-first-larger-epoch, I'm not sure if it will > >> guarantee > >> > > no > >> > > >> valid data is missed when we have consecutive log truncations > >> (maybe > >> > we > >> > > >> need to look back into details of KIP-101 to figure it out). If > the > >> > > latter > >> > > >> can indeed guarantee at least once, we could consider using that > >> > > approach. > >> > > >> > >> > > >> * My current understanding is that, with unclean leader election > >> > turned > >> > > >> on, > >> > > >> exactly-once is out of the window since we cannot guarantee that > >> all > >> > > >> committed message markers will not be lost. And hence there is no > >> need > >> > > to > >> > > >> have special handling logic for LOG_TRUNCATED or OOR error codes > >> with > >> > > >> read.committed turned on. Is that right? > >> > > >> > >> > > >> * MINOR: "if the epoch is greater than the minimum expected > epoch, > >> > that > >> > > >> the > >> > > >> new epoch does not begin at an earlier offset than the fetch > >> offset. > >> > In > >> > > >> the latter case, the leader can respond with a new LOG_TRUNCATION > >> > error > >> > > >> code" should it be "does not begin at a later offset than the > fetch > >> > > >> offset"? > >> > > >> > >> > > >> > >> > > >> > >> > > >> Guozhang > >> > > >> > >> > > >> > >> > > >> On Tue, Jun 26, 2018 at 6:51 PM, Dong Lin <lindon...@gmail.com> > >> > wrote: > >> > > >> > >> > > >> > Hey Jason, > >> > > >> > > >> > > >> > Thanks for the explanation. > >> > > >> > > >> > > >> > Please correct me if this is wrong. The "unknown truncation > >> offset" > >> > > >> > scenario happens when consumer does not have the full > >> leaderEpoch -> > >> > > >> offset > >> > > >> > mapping. In this case we can still use the KIP-101-based > >> approach to > >> > > >> > truncate offset to "start offset of the first Leader Epoch > larger > >> > than > >> > > >> last > >> > > >> > epoch of the consumer" but it may be inaccurate. So the KIP > >> chooses > >> > to > >> > > >> use > >> > > >> > the timestamp-based approach which is also best-effort. > >> > > >> > > >> > > >> > If this understanding is correct, for "closest" offset reset > >> policy > >> > > and > >> > > >> > "unknown truncation offset" scenario, I am wondering whether it > >> > maybe > >> > > >> > better to replace timestamp-based approach with KIP-101 based > >> > > approach. > >> > > >> In > >> > > >> > comparison to timestamp-based approach, the KIP-101-based > >> approach > >> > > >> seems to > >> > > >> > simplify the API a bit since user does not need to understand > >> > > timestamp. > >> > > >> > Similar to the timestamp-based approach, both approaches are > >> > > best-effort > >> > > >> > and do not guarantee that consumer can consume all messages. It > >> is > >> > not > >> > > >> like > >> > > >> > KIP-279 which guarantees that follower broker can consume all > >> > messages > >> > > >> from > >> > > >> > the leader. > >> > > >> > > >> > > >> > Then it seems that the remaining difference is mostly about > >> > accuracy, > >> > > >> i.e. > >> > > >> > how much message will be duplicated or missed in the "unknown > >> > > truncation > >> > > >> > offset" scenario. Not sure either one is clearly better than > the > >> > > other. > >> > > >> > Note that there are two scenarios mentioned in KIP-279 which > are > >> not > >> > > >> > addressed by KIP-101. Both scenarios require quick leadership > >> change > >> > > >> > between brokers, which seems to suggest that the offset based > >> > obtained > >> > > >> > by "start > >> > > >> > offset of the first Leader Epoch larger than last epoch of the > >> > > consumer" > >> > > >> > under these two scenarios may be very close to the offset > >> obtained > >> > by > >> > > >> the > >> > > >> > message timestamp. Does this sound reasonable? > >> > > >> > > >> > > >> > Good point that users on v1 format can get benefit with > timestamp > >> > > based > >> > > >> > approach. On the other hand it seems like a short term benefit > >> for > >> > > users > >> > > >> > who have not migrated. I am just not sure whether it is more > >> > important > >> > > >> than > >> > > >> > designing a better API. > >> > > >> > > >> > > >> > Also, for both "latest" and "earliest" reset policy, do you > >> think it > >> > > >> would > >> > > >> > make sense to also use the KIP-101 based approach to truncate > >> offset > >> > > for > >> > > >> > the "unknown truncation offset" scenario? > >> > > >> > > >> > > >> > > >> > > >> > Thanks, > >> > > >> > Dong > >> > > >> > > >> > > >> > >> > > >> > >> > > >> > >> > > >> -- > >> > > >> -- Guozhang > >> > > >> > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > > > > >