Hey Jun,

57. It's a fair point. I could go either way, but I'm slightly inclined to
just document the new API for now. We'll still support seeking to an offset
with corresponding epoch information, so deprecating the old seek() seems
like overkill.

60. The phrasing was a little confusing. Does this sound better?

"Log truncation is detected if there exists a leader epoch which is larger than
this epoch and begins at an offset earlier than the committed offset."

Thanks,
Jason


On Tue, Aug 7, 2018 at 12:11 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jason,
>
> Thanks for the update. I have some comments below:
>
> 1) Since FencedLeaderEpochException indicates that the metadata in the
> client is outdated, should it extend InvalidMetadataException?
>
> 2) It is mentioned that "To fix the problem with KIP-232, we will add the
> leader epoch the ListOffsets response. The consumer will use this in its
> first fetch request after resetting offsets". If consumer sends
> ListOffsetRequest to the broker who is no longer the leader, and the broker
> still think it is the leader, then the broker may return ListOffsetResponse
> whose leaderEpoch is smaller than the leaderEpoch in the metadata of the
> consumer. In this case consumer probably should not just send FetchRequest
> with the leaderEpoch of the ListOffsetResponse, right? I am wondering
> whether we should also include CurrentLeaderEpoch in the ListOffsetRequest.
>
> 3) Currently the new field added in the OffsetCommitRequest/
> OffsetFetchResponse is named LastLeaderEpoch. For the same reason that we
> are not naming the existing field "Offset" as "LastOffset", would it be
> more consistent to just name the new field as LeaderEpoch? Same for the new
> API in the class OffsetAndMetadata.
>
> 4) Could we clarify in the KIP where the value of CurrentLeaderEpoch in the
> FetchRequest comes from? I suppose this value can be updated by the
> MetadataResponse, right? If so, maybe we should also clarify that client
> should reject MetadataResponse if the leaderEpoch in the metadata response
> is smaller than what the client also knows from e.g.
> seek(...), OffsetFetchResponse?
>
>
> Thanks,
> Dong
>
>
> On Mon, Aug 6, 2018 at 5:30 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Jason,
> >
> > Thanks for the reply. They all make sense. Just a couple of more minor
> > comments.
> >
> > 57. I was thinking that if will be useful to encourage people to use the
> > new seek() api to get better semantics. Deprecating the old seek api is
> one
> > way. I guess we could also just document it for now.
> >
> > 60. "Log truncation is detected if the first offset of the epoch for the
> > committed offset is larger than this epoch and begins at an earlier
> > offset." It seems that we should add "that" before "is larger than"?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Aug 6, 2018 at 3:07 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I spent a little more time looking at the usage in WorkerSinkTask. I
> > think
> > > actually the initialization of the positions in the assignment callback
> > is
> > > not strictly necessary. We keep a map of the current consumed offsets
> > which
> > > is updated as we consume the data. As far as I can tell, we could
> either
> > > skip the initialization and wait until the first fetched records come
> in
> > or
> > > we could use the committed() API to initialize positions. I think the
> > root
> > > of it is the argument Anna made previously. The leader epoch lets us
> > track
> > > the history of records that we have consumed. It is only useful when we
> > > want to tell whether records we have consumed were lost. So getting the
> > > leader epoch of an arbitrary position that was seeked doesn't really
> make
> > > sense. The dependence on the consumed records is most explicit if we
> only
> > > expose the leader epoch inside the fetched records. We might consider
> > > adding a `lastConsumedLeaderEpoch` API to expose it directly, but I'm
> > > inclined to leave that as potential future work.
> > >
> > > A couple additional notes:
> > >
> > > 1. I've renamed OffsetAndMetadata.leaderEpoch to
> > > OffsetAndMetadata.lastLeaderEpoch. In fact, the consumer doesn't know
> > what
> > > the leader epoch of the committed offset should be, so this just
> > clarifies
> > > the expected usage.
> > >
> > > 2. I decided to add a helper to ConsumerRecords to get the next
> offsets.
> > We
> > > would use this in WorkerSinkTask and external storage use cases to
> > simplify
> > > the commit logic. If we are consuming batch by batch, then we don't
> need
> > > the message-level bookkeeping.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Mon, Aug 6, 2018 at 10:28 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for the review. Responses below:
> > > >
> > > > 50. Yes, that is right. I clarified this in the KIP.
> > > >
> > > > 51. Yes, updated the KIP to mention.
> > > >
> > > > 52. Yeah, this was a reference to a previous iteration. I've fixed
> it.
> > > >
> > > > 53. I changed the API to use an `Optional<Integer>` for the leader
> > epoch
> > > > and added a note about the default value. Does that seem reasonable?
> > > >
> > > > 54. We discussed this above, but could not find a great option. The
> > > > options are to add a new API (e.g. positionAndEpoch) or to rely on
> the
> > > user
> > > > to get the epoch from the fetched records. We were leaning toward the
> > > > latter, but I admit it was not fully satisfying. In this case,
> Connect
> > > > would need to track the last consumed offsets manually instead of
> > relying
> > > > on the consumer. We also considered adding a convenience method to
> > > > ConsumerRecords to get the offset to commit for all fetched
> partitions.
> > > > This makes the additional bookkeeping pretty minimal. What do you
> > think?
> > > >
> > > > 55. I clarified in the KIP. I was mainly thinking of situations
> where a
> > > > previously valid offset becomes out of range.
> > > >
> > > > 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it
> is
> > > > and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the
> Fetch
> > > > APIs. I think Dong suggested this previously as well.
> > > >
> > > > 57. We could, but I'm not sure there's a strong reason to do so. I
> was
> > > > thinking we would leave it around for convenience, but let me know if
> > you
> > > > think we should do otherwise.
> > > >
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > > On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > >> Hi, Jason,
> > > >>
> > > >> Thanks for the updated KIP. Well thought-through. Just a few minor
> > > >> comments
> > > >> below.
> > > >>
> > > >> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I
> > > guess
> > > >> under the cover, it will make OffsetsForLeaderEpoch request to
> > determine
> > > >> if
> > > >> the seeked offset is still valid before fetching? If so, it will be
> > > useful
> > > >> document this in the wiki.
> > > >>
> > > >> 51. Similarly, if the consumer fetch request gets
> > FENCED_LEADER_EPOCH, I
> > > >> guess the consumer will also make OffsetsForLeaderEpoch request to
> > > >> determine if the last consumed offset is still valid before
> fetching?
> > If
> > > >> so, it will be useful document this in the wiki.
> > > >>
> > > >> 52. "If the consumer seeks to the middle of the log, for example,
> then
> > > we
> > > >> will use the sentinel value -1 and the leader will skip the epoch
> > > >> validation. " Is this true? If the consumer seeks using
> > > >> seek(TopicPartition
> > > >> partition, OffsetAndMetadata offset) and the seeked offset is valid,
> > the
> > > >> consumer can/should use the leaderEpoch in the cached metadata for
> > > >> fetching?
> > > >>
> > > >> 53. OffsetAndMetadata. For backward compatibility, we need to
> support
> > > >> constructing OffsetAndMetadata without providing leaderEpoch. Could
> we
> > > >> define the default value of leaderEpoch if not provided and the
> > > semantics
> > > >> of that (e.g., skipping the epoch validation)?
> > > >>
> > > >> 54. I saw the following code in WorkerSinkTask in Connect. It saves
> > the
> > > >> offset obtained through position(), which can be committed latter.
> > Since
> > > >> position() doesn't return the leaderEpoch, this can lead to
> committed
> > > >> offset without leaderEpoch. Not sure how common this usage is, but
> > > what's
> > > >> the recommendation for such users?
> > > >>
> > > >> private class HandleRebalance implements ConsumerRebalanceListener {
> > > >>     @Override
> > > >>     public void onPartitionsAssigned(Collection<TopicPartition>
> > > >> partitions) {
> > > >>         log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
> > > >> partitions);
> > > >>         lastCommittedOffsets = new HashMap<>();
> > > >>         currentOffsets = new HashMap<>();
> > > >>         for (TopicPartition tp : partitions) {
> > > >>             long pos = consumer.position(tp);
> > > >>             lastCommittedOffsets.put(tp, new
> OffsetAndMetadata(pos));
> > > >>
> > > >> 55. "With this KIP, the only case in which this is possible is if
> the
> > > >> consumer fetches from an offset earlier than the log start offset."
> Is
> > > >> that
> > > >> true? I guess a user could seek to a large offset without providing
> > > >> leaderEpoch, which can cause the offset to be larger than the log
> end
> > > >> offset during fetch?
> > > >>
> > > >> 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems
> > to
> > > be
> > > >> an existing field. Is LeaderEpochQuery the new field? The name is
> not
> > > very
> > > >> intuitive. It will be useful to document its meaning.
> > > >>
> > > >> 57. Should we deprecate the following api?
> > > >> void seek(TopicPartition partition, long offset);
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson <ja...@confluent.io
> >
> > > >> wrote:
> > > >>
> > > >> > Hey All,
> > > >> >
> > > >> > I think I've addressed all pending review. If there is no
> additional
> > > >> > feedback, I'll plan to start a vote thread next week.
> > > >> >
> > > >> > Thanks,
> > > >> > Jason
> > > >> >
> > > >> > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > Hey Jason,
> > > >> > >
> > > >> > > Thanks for your reply. I will comment below.
> > > >> > >
> > > >> > > Regarding 1, we probably can not simply rename both to
> > `LeaderEpoch`
> > > >> > > because we already have a LeaderEpoch field in
> > > OffsetsForLeaderEpoch.
> > > >> > >
> > > >> > > Regarding 5, I am not strong on this. I agree with the two
> > benefits
> > > of
> > > >> > > having two error codes: 1) not having to refresh metadata when
> > > >> consumer
> > > >> > > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the
> > log
> > > >> for
> > > >> > > debugging. Whether or not these two benefits are useful enough
> for
> > > one
> > > >> > more
> > > >> > > error code may be subjective. I will let you and others
> determine
> > > >> this.
> > > >> > >
> > > >> > > Regarding 6, yeah overloading seek() looks good to me.
> > > >> > >
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Dong
> > > >> > >
> > > >> > >
> > > >> > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson <
> > > ja...@confluent.io>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hey Dong,
> > > >> > > >
> > > >> > > > Thanks for the detailed review. Responses below:
> > > >> > > >
> > > >> > > > 1/2: Thanks for noticing the inconsistency. Would it be
> > reasonable
> > > >> to
> > > >> > > > simply call it LeaderEpoch for both APIs?
> > > >> > > >
> > > >> > > > 3: I agree it should be a map. I will update.
> > > >> > > >
> > > >> > > > 4: Fair point. I think we should always be able to identify an
> > > >> offset.
> > > >> > > > Let's remove the Optional for now and reconsider if we find an
> > > >> > unhandled
> > > >> > > > case during implementation.
> > > >> > > >
> > > >> > > > 5: Yeah, I was thinking about this. The two error codes could
> be
> > > >> > handled
> > > >> > > > similarly, so we might merge them. Mainly I was thinking that
> it
> > > >> will
> > > >> > be
> > > >> > > > useful for consumers/replicas to know whether they are ahead
> or
> > > >> behind
> > > >> > > the
> > > >> > > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH,
> it
> > > >> need
> > > >> > not
> > > >> > > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH
> > > error,
> > > >> it
> > > >> > > > could just stop fetching and await the LeaderAndIsr request
> that
> > > it
> > > >> is
> > > >> > > > missing. It probably also makes debugging a little bit
> easier. I
> > > >> guess
> > > >> > > I'm
> > > >> > > > a bit inclined to keep both error codes, but I'm open to
> > > >> > reconsideration
> > > >> > > if
> > > >> > > > you feel strongly. Another point to consider is whether we
> > should
> > > >> > > continue
> > > >> > > > using NOT_LEADER_FOR_PARTITION if a follower receives an
> > > unexpected
> > > >> > > fetch.
> > > >> > > > The leader epoch would be different in this case so we could
> use
> > > >> one of
> > > >> > > the
> > > >> > > > invalid epoch error codes instead since they contain more
> > > >> information.
> > > >> > > >
> > > >> > > > 6: I agree the name is not ideal in that scenario. What if we
> > > >> > overloaded
> > > >> > > > `seek`?
> > > >> > > >
> > > >> > > > 7: Sure, I will mention this.
> > > >> > > >
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Jason
> > > >> > > >
> > > >> > > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin <
> lindon...@gmail.com>
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Hey Jason,
> > > >> > > > >
> > > >> > > > > Thanks for the update! I agree with the current proposal
> > > overall.
> > > >> I
> > > >> > > have
> > > >> > > > > some minor comments related to naming etc.
> > > >> > > > >
> > > >> > > > > 1) I am not strong and will just leave it here for
> discussion.
> > > >> Would
> > > >> > it
> > > >> > > > be
> > > >> > > > > better to rename "CurrentLeaderEpoch" to
> "ExpectedLeaderEpoch"
> > > for
> > > >> > the
> > > >> > > > new
> > > >> > > > > field in the OffsetsForLeaderEpochRequest? The reason is
> that
> > > >> > > > > "CurrentLeaderEpoch" may not necessarily be true current
> > leader
> > > >> epoch
> > > >> > > if
> > > >> > > > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows
> > > that
> > > >> > this
> > > >> > > > > epoch is what consumer expects on the broker which may or
> may
> > > not
> > > >> be
> > > >> > > the
> > > >> > > > > true value.
> > > >> > > > >
> > > >> > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest
> > and
> > > >> the
> > > >> > > field
> > > >> > > > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given
> > that
> > > >> both
> > > >> > > > > fields are compared with the leaderEpoch in the broker,
> would
> > it
> > > >> be
> > > >> > > > better
> > > >> > > > > to give them the same name?
> > > >> > > > >
> > > >> > > > > 3) Currently LogTruncationException.truncationOffset()
> > returns
> > > >> > > > > Optional<OffsetAndMetadata> to user. Should it return
> > > >> > > > > Optional<Map<TopicPartition, OffsetAndMetadata>> to handle
> the
> > > >> > scenario
> > > >> > > > > where leaderEpoch of multiple partitions are different from
> > the
> > > >> > > > leaderEpoch
> > > >> > > > > in the broker?
> > > >> > > > >
> > > >> > > > > 4) Currently LogTruncationException.truncationOffset()
> > returns
> > > an
> > > >> > > > Optional
> > > >> > > > > value. Could you explain a bit more when it will return
> > > >> > > > Optional.empty()? I
> > > >> > > > > am trying to understand whether it is simpler and reasonable
> > to
> > > >> > > > > replace Optional.empty()
> > > >> > > > > with OffsetMetadata(offset=last_fetched_offset,
> > > leaderEpoch=-1).
> > > >> > > > >
> > > >> > > > > 5) Do we also need to add a new retriable exception for
> error
> > > code
> > > >> > > > > FENCED_LEADER_EPOCH? And do we need to define both
> > > >> > FENCED_LEADER_EPOCH
> > > >> > > > > and UNKNOWN_LEADER_EPOCH.
> > > >> > > > > It seems that the current KIP uses these two error codes in
> > the
> > > >> same
> > > >> > > way
> > > >> > > > > and the exception for these two error codes is not exposed
> to
> > > the
> > > >> > user.
> > > >> > > > > Maybe we should combine them into one error, e.g.
> > > >> > INVALID_LEADER_EPOCH?
> > > >> > > > >
> > > >> > > > > 6) For users who has turned off auto offset reset, when
> > > >> > consumer.poll()
> > > >> > > > > throw LogTruncationException, it seems that user will most
> > > likely
> > > >> > call
> > > >> > > > > seekToCommitted(offset,
> > > >> > > > > leaderEpoch) where offset and leaderEpoch are obtained from
> > > >> > > > > LogTruncationException.truncationOffset(). In this case,
> the
> > > >> offset
> > > >> > > used
> > > >> > > > > here is not committed, which is inconsistent from the method
> > > name
> > > >> > > > > seekToCommitted(...). Would it be better to rename the
> method
> > to
> > > >> e.g.
> > > >> > > > > seekToLastConsumedMessage()?
> > > >> > > > >
> > > >> > > > > 7) Per point 3 in Jun's comment, would it be useful to
> > > explicitly
> > > >> > > specify
> > > >> > > > > in the KIP that we will log the truncation event if user has
> > > >> turned
> > > >> > on
> > > >> > > > auto
> > > >> > > > > offset reset policy?
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Dong
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson <
> > > >> > ja...@confluent.io>
> > > >> > > > > wrote:
> > > >> > > > >
> > > >> > > > > > Thanks Anna, you are right on both points. I updated the
> > KIP.
> > > >> > > > > >
> > > >> > > > > > -Jason
> > > >> > > > > >
> > > >> > > > > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner <
> > > >> a...@confluent.io>
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Jason,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for the update. I agree with the current
> proposal.
> > > >> > > > > > >
> > > >> > > > > > > Two minor comments:
> > > >> > > > > > > 1) In “API Changes” section, first paragraph says that
> > > “users
> > > >> can
> > > >> > > > catch
> > > >> > > > > > the
> > > >> > > > > > > more specific exception type and use the new
> > > `seekToNearest()`
> > > >> > API
> > > >> > > > > > defined
> > > >> > > > > > > below.”. Since LogTruncationException “will include the
> > > >> > partitions
> > > >> > > > that
> > > >> > > > > > > were truncated and the offset of divergence”., shouldn’t
> > the
> > > >> > client
> > > >> > > > use
> > > >> > > > > > > seek(offset) to seek to the offset of divergence in
> > response
> > > >> to
> > > >> > the
> > > >> > > > > > > exception?
> > > >> > > > > > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch
> > > >> > subsection
> > > >> > > > says
> > > >> > > > > > > “Note
> > > >> > > > > > > that consumers will send a sentinel value (-1) for the
> > > current
> > > >> > > epoch
> > > >> > > > > and
> > > >> > > > > > > the broker will simply disregard that validation.”. Is
> > that
> > > >> still
> > > >> > > > true
> > > >> > > > > > with
> > > >> > > > > > > MetadataResponse containing leader epoch?
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Anna
> > > >> > > > > > >
> > > >> > > > > > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson <
> > > >> > > ja...@confluent.io>
> > > >> > > > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hi All,
> > > >> > > > > > > >
> > > >> > > > > > > > I have made some updates to the KIP. As many of you
> > know,
> > > a
> > > >> > side
> > > >> > > > > > project
> > > >> > > > > > > of
> > > >> > > > > > > > mine has been specifying the Kafka replication
> protocol
> > in
> > > >> TLA.
> > > >> > > You
> > > >> > > > > can
> > > >> > > > > > > > check out the code here if you are interested:
> > > >> > > > > > > > https://github.com/hachikuji/kafka-specification. In
> > > >> addition
> > > >> > to
> > > >> > > > > > > > uncovering
> > > >> > > > > > > > a couple unknown bugs in the replication protocol
> (e.g.
> > > >> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7128),
> this
> > > has
> > > >> > > helped
> > > >> > > > > me
> > > >> > > > > > > > validate the behavior in this KIP. In fact, the
> original
> > > >> > version
> > > >> > > I
> > > >> > > > > > > proposed
> > > >> > > > > > > > had a weakness. I initially suggested letting the
> leader
> > > >> > validate
> > > >> > > > the
> > > >> > > > > > > > expected epoch at the fetch offset. This made sense
> for
> > > the
> > > >> > > > consumer
> > > >> > > > > in
> > > >> > > > > > > the
> > > >> > > > > > > > handling of unclean leader election, but it was not
> > strong
> > > >> > enough
> > > >> > > > to
> > > >> > > > > > > > protect the follower in all cases. In order to make
> > > >> advancement
> > > >> > > of
> > > >> > > > > the
> > > >> > > > > > > high
> > > >> > > > > > > > watermark safe, for example, the leader actually needs
> > to
> > > be
> > > >> > sure
> > > >> > > > > that
> > > >> > > > > > > > every follower in the ISR matches its own epoch.
> > > >> > > > > > > >
> > > >> > > > > > > > I attempted to fix this problem by treating the epoch
> in
> > > the
> > > >> > > fetch
> > > >> > > > > > > request
> > > >> > > > > > > > slightly differently for consumers and followers. For
> > > >> > consumers,
> > > >> > > it
> > > >> > > > > > would
> > > >> > > > > > > > be the expected epoch of the record at the fetch
> offset,
> > > and
> > > >> > the
> > > >> > > > > leader
> > > >> > > > > > > > would raise a LOG_TRUNCATION error if the expectation
> > > >> failed.
> > > >> > For
> > > >> > > > > > > > followers, it would be the current epoch and the
> leader
> > > >> would
> > > >> > > > require
> > > >> > > > > > > that
> > > >> > > > > > > > it match its own epoch. This was unsatisfying both
> > because
> > > >> of
> > > >> > the
> > > >> > > > > > > > inconsistency in behavior and because the consumer was
> > > left
> > > >> > with
> > > >> > > > the
> > > >> > > > > > > weaker
> > > >> > > > > > > > fencing that we already knew was insufficient for the
> > > >> replicas.
> > > >> > > > > > > Ultimately
> > > >> > > > > > > > I decided that we should make the behavior consistent
> > and
> > > >> that
> > > >> > > > meant
> > > >> > > > > > that
> > > >> > > > > > > > the consumer needed to act more like a following
> > replica.
> > > >> > Instead
> > > >> > > > of
> > > >> > > > > > > > checking for truncation while fetching, the consumer
> > > should
> > > >> > check
> > > >> > > > for
> > > >> > > > > > > > truncation after leader changes. After checking for
> > > >> truncation,
> > > >> > > the
> > > >> > > > > > > > consumer can then use the current epoch when fetching
> > and
> > > >> get
> > > >> > the
> > > >> > > > > > > stronger
> > > >> > > > > > > > protection that it provides. What this means is that
> the
> > > >> > Metadata
> > > >> > > > API
> > > >> > > > > > > must
> > > >> > > > > > > > include the current leader epoch. Given the problems
> we
> > > have
> > > >> > had
> > > >> > > > > around
> > > >> > > > > > > > stale metadata and how challenging they have been to
> > > debug,
> > > >> I'm
> > > >> > > > > > convinced
> > > >> > > > > > > > that this is a good idea in any case and it resolves
> the
> > > >> > > > inconsistent
> > > >> > > > > > > > behavior in the Fetch API. The downside is that there
> > will
> > > >> be
> > > >> > > some
> > > >> > > > > > > > additional overhead upon leader changes, but I don't
> > think
> > > >> it
> > > >> > is
> > > >> > > a
> > > >> > > > > > major
> > > >> > > > > > > > concern since leader changes are rare and the
> > > >> > > OffsetForLeaderEpoch
> > > >> > > > > > > request
> > > >> > > > > > > > is cheap.
> > > >> > > > > > > >
> > > >> > > > > > > > This approach leaves the door open for some
> interesting
> > > >> follow
> > > >> > up
> > > >> > > > > > > > improvements. For example, now that we have the leader
> > > >> epoch in
> > > >> > > the
> > > >> > > > > > > > Metadata request, we can implement similar fencing for
> > the
> > > >> > > Produce
> > > >> > > > > API.
> > > >> > > > > > > And
> > > >> > > > > > > > now that the consumer can reason about truncation, we
> > > could
> > > >> > > > consider
> > > >> > > > > > > having
> > > >> > > > > > > > a configuration to expose records beyond the high
> > > watermark.
> > > >> > This
> > > >> > > > > would
> > > >> > > > > > > let
> > > >> > > > > > > > users trade lower end-to-end latency for weaker
> > durability
> > > >> > > > semantics.
> > > >> > > > > > It
> > > >> > > > > > > is
> > > >> > > > > > > > sort of like having an acks=0 option for the consumer.
> > > >> Neither
> > > >> > of
> > > >> > > > > these
> > > >> > > > > > > > options are included in this KIP, I am just mentioning
> > > them
> > > >> as
> > > >> > > > > > potential
> > > >> > > > > > > > work for the future.
> > > >> > > > > > > >
> > > >> > > > > > > > Finally, based on the discussion in this thread, I
> have
> > > >> added
> > > >> > the
> > > >> > > > > > > > seekToCommitted API for the consumer. Please take a
> look
> > > and
> > > >> > let
> > > >> > > me
> > > >> > > > > > know
> > > >> > > > > > > > what you think.
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks,
> > > >> > > > > > > > Jason
> > > >> > > > > > > >
> > > >> > > > > > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang <
> > > >> > > wangg...@gmail.com
> > > >> > > > >
> > > >> > > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Jason,
> > > >> > > > > > > > >
> > > >> > > > > > > > > The proposed API seems reasonable to me too. Could
> you
> > > >> please
> > > >> > > > also
> > > >> > > > > > > update
> > > >> > > > > > > > > the wiki page (
> > > >> > > > > > > > > https://cwiki.apache.org/confl
> > uence/display/KAFKA/KIP-
> > > >> > > > > > > > > 320%3A+Allow+fetchers+to+detec
> > > >> t+and+handle+log+truncation)
> > > >> > > > > > > > > with a section say "workflow" on how the proposed
> API
> > > >> will be
> > > >> > > > > co-used
> > > >> > > > > > > > with
> > > >> > > > > > > > > others to:
> > > >> > > > > > > > >
> > > >> > > > > > > > > 1. consumer callers handling a
> LogTruncationException.
> > > >> > > > > > > > > 2. consumer internals for handling a retriable
> > > >> > > > > > > > UnknownLeaderEpochException.
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > Guozhang
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <
> > > >> > > > a...@confluent.io>
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Hi Jason,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > I also like your proposal and agree that
> > > >> > > > > > > > KafkaConsumer#seekToCommitted()
> > > >> > > > > > > > > > is
> > > >> > > > > > > > > > more intuitive as a way to initialize both
> > consumer's
> > > >> > > position
> > > >> > > > > and
> > > >> > > > > > > its
> > > >> > > > > > > > > > fetch state.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > My understanding that
> KafkaConsumer#seekToCommitted(
> > )
> > > >> is
> > > >> > > > purely
> > > >> > > > > > for
> > > >> > > > > > > > > > clients
> > > >> > > > > > > > > > who store their offsets externally, right? And we
> > are
> > > >> still
> > > >> > > > going
> > > >> > > > > > to
> > > >> > > > > > > > > > add KafkaConsumer#findOffsets()
> > > >> > > > > > > > > > in this KIP as we discussed, so that the client
> can
> > > >> handle
> > > >> > > > > > > > > > LogTruncationException?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Anna
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <
> > > >> > > lindon...@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Hey Jason,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > It is a great summary. The solution sounds
> good. I
> > > >> might
> > > >> > > have
> > > >> > > > > > minor
> > > >> > > > > > > > > > > comments regarding the method name. But we can
> > > discuss
> > > >> > that
> > > >> > > > > minor
> > > >> > > > > > > > > points
> > > >> > > > > > > > > > > later after we reach consensus on the high level
> > > API.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > Dong
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason
> Gustafson
> > <
> > > >> > > > > > > > ja...@confluent.io>
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > Hey Anna and Dong,
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thanks a lot for the great discussion. I've
> been
> > > >> > hanging
> > > >> > > > > back a
> > > >> > > > > > > bit
> > > >> > > > > > > > > > > because
> > > >> > > > > > > > > > > > honestly the best option hasn't seemed clear.
> I
> > > >> agree
> > > >> > > with
> > > >> > > > > > Anna's
> > > >> > > > > > > > > > general
> > > >> > > > > > > > > > > > observation that there is a distinction
> between
> > > the
> > > >> > > > position
> > > >> > > > > of
> > > >> > > > > > > the
> > > >> > > > > > > > > > > > consumer and its fetch state up to that
> > position.
> > > If
> > > >> > you
> > > >> > > > > think
> > > >> > > > > > > > about
> > > >> > > > > > > > > > it,
> > > >> > > > > > > > > > > a
> > > >> > > > > > > > > > > > committed offset actually represents both of
> > > these.
> > > >> The
> > > >> > > > > > metadata
> > > >> > > > > > > is
> > > >> > > > > > > > > > used
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > initialize the state of the consumer
> application
> > > and
> > > >> > the
> > > >> > > > > offset
> > > >> > > > > > > > > > > initializes
> > > >> > > > > > > > > > > > the position. Additionally, we are extending
> the
> > > >> offset
> > > >> > > > > commit
> > > >> > > > > > in
> > > >> > > > > > > > > this
> > > >> > > > > > > > > > > KIP
> > > >> > > > > > > > > > > > to also include the last epoch fetched by the
> > > >> consumer,
> > > >> > > > which
> > > >> > > > > > is
> > > >> > > > > > > > used
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > initialize the internal fetch state. Of course
> > if
> > > >> you
> > > >> > do
> > > >> > > an
> > > >> > > > > > > > arbitrary
> > > >> > > > > > > > > > > > `seek` and immediately commit offsets, then
> > there
> > > >> won't
> > > >> > > be
> > > >> > > > a
> > > >> > > > > > last
> > > >> > > > > > > > > epoch
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > commit. This seems intuitive since there is no
> > > fetch
> > > >> > > state
> > > >> > > > in
> > > >> > > > > > > this
> > > >> > > > > > > > > > case.
> > > >> > > > > > > > > > > We
> > > >> > > > > > > > > > > > only commit fetch state when we have it.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > So if we think about a committed offset as
> > > >> initializing
> > > >> > > > both
> > > >> > > > > > the
> > > >> > > > > > > > > > > consumer's
> > > >> > > > > > > > > > > > position and its fetch state, then the gap in
> > the
> > > >> API
> > > >> > is
> > > >> > > > > > > evidently
> > > >> > > > > > > > > that
> > > >> > > > > > > > > > > we
> > > >> > > > > > > > > > > > don't have a way to initialize the consumer
> to a
> > > >> > > committed
> > > >> > > > > > > offset.
> > > >> > > > > > > > We
> > > >> > > > > > > > > > do
> > > >> > > > > > > > > > > it
> > > >> > > > > > > > > > > > implicitly of course for offsets stored in
> > Kafka,
> > > >> but
> > > >> > > since
> > > >> > > > > > > > external
> > > >> > > > > > > > > > > > storage is a use case we support, then we
> should
> > > >> have
> > > >> > an
> > > >> > > > > > explicit
> > > >> > > > > > > > API
> > > >> > > > > > > > > > as
> > > >> > > > > > > > > > > > well. Perhaps something like this:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > seekToCommitted(TopicPartition,
> > > OffsetAndMetadata)
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > In this KIP, we are proposing to allow the
> > > >> > > > > `OffsetAndMetadata`
> > > >> > > > > > > > object
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > include the leader epoch, so I think this
> would
> > > have
> > > >> > the
> > > >> > > > same
> > > >> > > > > > > > effect
> > > >> > > > > > > > > as
> > > >> > > > > > > > > > > > Anna's suggested `seekToRecord`. But perhaps
> it
> > > is a
> > > >> > more
> > > >> > > > > > natural
> > > >> > > > > > > > fit
> > > >> > > > > > > > > > > given
> > > >> > > > > > > > > > > > the current API? Furthermore, if we find a
> need
> > > for
> > > >> > > > > additional
> > > >> > > > > > > > > metadata
> > > >> > > > > > > > > > > in
> > > >> > > > > > > > > > > > the offset commit API in the future, then we
> > will
> > > >> just
> > > >> > > need
> > > >> > > > > to
> > > >> > > > > > > > modify
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > `OffsetAndMetadata` object and we will not
> need
> > a
> > > >> new
> > > >> > > > `seek`
> > > >> > > > > > API.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > With this approach, I think then we can leave
> > the
> > > >> > > > `position`
> > > >> > > > > > API
> > > >> > > > > > > as
> > > >> > > > > > > > > it
> > > >> > > > > > > > > > > is.
> > > >> > > > > > > > > > > > The position of the consumer is still just the
> > > next
> > > >> > > > expected
> > > >> > > > > > > fetch
> > > >> > > > > > > > > > > offset.
> > > >> > > > > > > > > > > > If a user needs to record additional state
> based
> > > on
> > > >> > > > previous
> > > >> > > > > > > fetch
> > > >> > > > > > > > > > > > progress, then they would use the result of
> the
> > > >> > previous
> > > >> > > > > fetch
> > > >> > > > > > to
> > > >> > > > > > > > > > obtain
> > > >> > > > > > > > > > > > it. This makes the dependence on fetch
> progress
> > > >> > > explicit. I
> > > >> > > > > > think
> > > >> > > > > > > > we
> > > >> > > > > > > > > > > could
> > > >> > > > > > > > > > > > make this a little more convenience with a
> > helper
> > > in
> > > >> > the
> > > >> > > > > > > > > > > `ConsumerRecords`
> > > >> > > > > > > > > > > > object, but I think that's more of a
> > nice-to-have.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thoughts?
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > By the way, I have been iterating a little bit
> > on
> > > >> the
> > > >> > > > replica
> > > >> > > > > > > side
> > > >> > > > > > > > of
> > > >> > > > > > > > > > > this
> > > >> > > > > > > > > > > > KIP. My initial proposal in fact did not have
> > > strong
> > > >> > > enough
> > > >> > > > > > > fencing
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > > protect all of the edge cases. I believe the
> > > current
> > > >> > > > proposal
> > > >> > > > > > > fixes
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > problems, but I am still verifying the model.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > Jason
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <
> > > >> > > > > > lindon...@gmail.com>
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Hey Anna,
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thanks much for the explanation. Approach 1
> > also
> > > >> > sounds
> > > >> > > > > good
> > > >> > > > > > to
> > > >> > > > > > > > > me. I
> > > >> > > > > > > > > > > > think
> > > >> > > > > > > > > > > > > findOffsets() is useful for users who don't
> > use
> > > >> > > automatic
> > > >> > > > > > > offset
> > > >> > > > > > > > > > reset
> > > >> > > > > > > > > > > > > policy.
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Just one more question. Since users who
> store
> > > >> offsets
> > > >> > > > > > > externally
> > > >> > > > > > > > > need
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > > provide leaderEpoch to findOffsets(...), do
> we
> > > >> need
> > > >> > an
> > > >> > > > > extra
> > > >> > > > > > > API
> > > >> > > > > > > > > for
> > > >> > > > > > > > > > > user
> > > >> > > > > > > > > > > > > to get both offset and leaderEpoch, e.g.
> > > >> > > > recordPosition()?
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > Dong
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna
> > Povzner <
> > > >> > > > > > > > a...@confluent.io>
> > > >> > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Hi Dong,
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > What I called “not covering all use cases”
> > is
> > > >> what
> > > >> > > you
> > > >> > > > > call
> > > >> > > > > > > > > > > best-effort
> > > >> > > > > > > > > > > > > > (not guaranteeing some corner cases). I
> > think
> > > we
> > > >> > are
> > > >> > > on
> > > >> > > > > the
> > > >> > > > > > > > same
> > > >> > > > > > > > > > page
> > > >> > > > > > > > > > > > > here.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > I wanted to be clear in the API whether
> the
> > > >> > consumer
> > > >> > > > > seeks
> > > >> > > > > > > to a
> > > >> > > > > > > > > > > > position
> > > >> > > > > > > > > > > > > > (offset) or to a record (offset, leader
> > > epoch).
> > > >> The
> > > >> > > > only
> > > >> > > > > > > > use-case
> > > >> > > > > > > > > > of
> > > >> > > > > > > > > > > > > > seeking to a record is seeking to a
> > committed
> > > >> > offset
> > > >> > > > for
> > > >> > > > > a
> > > >> > > > > > > user
> > > >> > > > > > > > > who
> > > >> > > > > > > > > > > > > stores
> > > >> > > > > > > > > > > > > > committed offsets externally. (Unless
> users
> > > find
> > > >> > some
> > > >> > > > > other
> > > >> > > > > > > > > reason
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > > seek
> > > >> > > > > > > > > > > > > > to a record.) I thought it was possible to
> > > >> provide
> > > >> > > this
> > > >> > > > > > > > > > functionality
> > > >> > > > > > > > > > > > > with
> > > >> > > > > > > > > > > > > > findOffset(offset, leader epoch) followed
> > by a
> > > >> > > > > > seek(offset).
> > > >> > > > > > > > > > However,
> > > >> > > > > > > > > > > > you
> > > >> > > > > > > > > > > > > > are right that this will not handle the
> race
> > > >> > > condition
> > > >> > > > > > where
> > > >> > > > > > > > > > > > > non-divergent
> > > >> > > > > > > > > > > > > > offset found by findOffset() could change
> > > again
> > > >> > > before
> > > >> > > > > the
> > > >> > > > > > > > > consumer
> > > >> > > > > > > > > > > > does
> > > >> > > > > > > > > > > > > > the first fetch.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Regarding position() — if we add position
> > that
> > > >> > > returns
> > > >> > > > > > > (offset,
> > > >> > > > > > > > > > > leader
> > > >> > > > > > > > > > > > > > epoch), this is specifically a position
> > after
> > > a
> > > >> > > record
> > > >> > > > > that
> > > >> > > > > > > was
> > > >> > > > > > > > > > > > actually
> > > >> > > > > > > > > > > > > > consumed or position of a committed
> record.
> > In
> > > >> > which
> > > >> > > > > case,
> > > >> > > > > > I
> > > >> > > > > > > > > still
> > > >> > > > > > > > > > > > think
> > > >> > > > > > > > > > > > > > it’s cleaner to get a record position of
> > > >> consumed
> > > >> > > > message
> > > >> > > > > > > from
> > > >> > > > > > > > a
> > > >> > > > > > > > > > new
> > > >> > > > > > > > > > > > > helper
> > > >> > > > > > > > > > > > > > method in ConsumerRecords() or from
> > committed
> > > >> > > offsets.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > I think all the use-cases could be then
> > > covered
> > > >> > with:
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > (Approach 1)
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > seekToRecord(offset, leaderEpoch) — this
> > will
> > > >> just
> > > >> > > > > > > > initialize/set
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > > > consumer state;
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns
> > > >> {offset,
> > > >> > > > > > > leaderEpoch}
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > If we agree that the race condition is
> also
> > a
> > > >> > corner
> > > >> > > > > case,
> > > >> > > > > > > > then I
> > > >> > > > > > > > > > > think
> > > >> > > > > > > > > > > > > we
> > > >> > > > > > > > > > > > > > can cover use-cases with:
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > (Approach 2)
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns
> > > offset
> > > >> —
> > > >> > we
> > > >> > > > > still
> > > >> > > > > > > want
> > > >> > > > > > > > > > > leader
> > > >> > > > > > > > > > > > > > epoch as a parameter for the users who
> store
> > > >> their
> > > >> > > > > > committed
> > > >> > > > > > > > > > offsets
> > > >> > > > > > > > > > > > > > externally.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > I am actually now leaning more to approach
> > 1,
> > > >> since
> > > >> > > it
> > > >> > > > is
> > > >> > > > > > > more
> > > >> > > > > > > > > > > > explicit,
> > > >> > > > > > > > > > > > > > and maybe there are more use cases for it.
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > Anna
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <
> > > >> > > > > > > lindon...@gmail.com>
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > Hey Anna,
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > Thanks for the comment. To answer your
> > > >> question,
> > > >> > it
> > > >> > > > > seems
> > > >> > > > > > > > that
> > > >> > > > > > > > > we
> > > >> > > > > > > > > > > can
> > > >> > > > > > > > > > > > > > cover
> > > >> > > > > > > > > > > > > > > all case in this KIP. As stated in
> > "Consumer
> > > >> > > > Handling"
> > > >> > > > > > > > section,
> > > >> > > > > > > > > > > > KIP-101
> > > >> > > > > > > > > > > > > > > based approach will be used to derive
> the
> > > >> > > truncation
> > > >> > > > > > offset
> > > >> > > > > > > > > from
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > 2-tuple (offset, leaderEpoch). This
> > approach
> > > >> is
> > > >> > > best
> > > >> > > > > > effort
> > > >> > > > > > > > and
> > > >> > > > > > > > > > it
> > > >> > > > > > > > > > > is
> > > >> > > > > > > > > > > > > > > inaccurate only in very rare scenarios
> (as
> > > >> > > described
> > > >> > > > in
> > > >> > > > > > > > > KIP-279).
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > By using seek(offset, leaderEpoch),
> > consumer
> > > >> will
> > > >> > > > still
> > > >> > > > > > be
> > > >> > > > > > > > able
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > > follow
> > > >> > > > > > > > > > > > > > > this best-effort approach to detect log
> > > >> > truncation
> > > >> > > > and
> > > >> > > > > > > > > determine
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > truncation offset. On the other hand, if
> > we
> > > >> use
> > > >> > > > > > > seek(offset),
> > > >> > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > > not detect log truncation in some cases
> > > which
> > > >> > > weakens
> > > >> > > > > the
> > > >> > > > > > > > > > guarantee
> > > >> > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > KIP. Does this make sense?
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > Dong
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna
> > > Povzner
> > > >> <
> > > >> > > > > > > > > a...@confluent.io
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Sorry, I hit "send" before finishing.
> > > >> > > Continuing...
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > 2) Hiding most of the consumer
> handling
> > > log
> > > >> > > > > truncation
> > > >> > > > > > > > logic
> > > >> > > > > > > > > > with
> > > >> > > > > > > > > > > > > > minimal
> > > >> > > > > > > > > > > > > > > > exposure in KafkaConsumer API.  I was
> > > >> proposing
> > > >> > > > this
> > > >> > > > > > > path.
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Before answering your specific
> > questions…
> > > I
> > > >> > want
> > > >> > > to
> > > >> > > > > > > answer
> > > >> > > > > > > > to
> > > >> > > > > > > > > > > your
> > > >> > > > > > > > > > > > > > > comment
> > > >> > > > > > > > > > > > > > > > “In general, maybe we should discuss
> the
> > > >> final
> > > >> > > > > solution
> > > >> > > > > > > > that
> > > >> > > > > > > > > > > covers
> > > >> > > > > > > > > > > > > all
> > > >> > > > > > > > > > > > > > > > cases?”. With current KIP, we don’t
> > cover
> > > >> all
> > > >> > > cases
> > > >> > > > > of
> > > >> > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > detecting
> > > >> > > > > > > > > > > > > > > > log truncation because the KIP
> proposes
> > a
> > > >> > leader
> > > >> > > > > epoch
> > > >> > > > > > > > cache
> > > >> > > > > > > > > in
> > > >> > > > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > > that does not persist across restarts.
> > > >> Plus, we
> > > >> > > > only
> > > >> > > > > > > store
> > > >> > > > > > > > > last
> > > >> > > > > > > > > > > > > > committed
> > > >> > > > > > > > > > > > > > > > offset (either internally or users can
> > > store
> > > >> > > > > > externally).
> > > >> > > > > > > > > This
> > > >> > > > > > > > > > > has
> > > >> > > > > > > > > > > > a
> > > >> > > > > > > > > > > > > > > > limitation that the consumer will not
> > > >> always be
> > > >> > > > able
> > > >> > > > > to
> > > >> > > > > > > > find
> > > >> > > > > > > > > > > point
> > > >> > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > truncation just because we have a
> > limited
> > > >> > history
> > > >> > > > > (just
> > > >> > > > > > > one
> > > >> > > > > > > > > > data
> > > >> > > > > > > > > > > > > > point).
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > So, maybe we should first agree on
> > whether
> > > >> we
> > > >> > > > accept
> > > >> > > > > > that
> > > >> > > > > > > > > > storing
> > > >> > > > > > > > > > > > > last
> > > >> > > > > > > > > > > > > > > > committed offset/leader epoch has a
> > > >> limitation
> > > >> > > that
> > > >> > > > > the
> > > >> > > > > > > > > > consumer
> > > >> > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > not
> > > >> > > > > > > > > > > > > > > > be able to detect log truncation in
> all
> > > >> cases?
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > Anna
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna
> > > >> Povzner <
> > > >> > > > > > > > > > a...@confluent.io>
> > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > Hi Dong,
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > Thanks for the follow up! I finally
> > have
> > > >> much
> > > >> > > > more
> > > >> > > > > > > clear
> > > >> > > > > > > > > > > > > > understanding
> > > >> > > > > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > > where you are coming from.
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > You are right. The success of
> > > >> > > > > findOffsets()/finding a
> > > >> > > > > > > > point
> > > >> > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > > non-divergence depends on whether we
> > > have
> > > >> > > enough
> > > >> > > > > > > entries
> > > >> > > > > > > > in
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > consumer's
> > > >> > > > > > > > > > > > > > > > > leader epoch cache. However, I think
> > > this
> > > >> is
> > > >> > a
> > > >> > > > > > > > fundamental
> > > >> > > > > > > > > > > > > limitation
> > > >> > > > > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > > having a leader epoch cache that
> does
> > > not
> > > >> > > persist
> > > >> > > > > > > across
> > > >> > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > > restarts.
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > If we consider the general case
> where
> > > >> > consumer
> > > >> > > > may
> > > >> > > > > or
> > > >> > > > > > > may
> > > >> > > > > > > > > not
> > > >> > > > > > > > > > > > have
> > > >> > > > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > > > cache, then I see two paths:
> > > >> > > > > > > > > > > > > > > > > 1) Letting the user to track the
> > leader
> > > >> epoch
> > > >> > > > > history
> > > >> > > > > > > > > > > externally,
> > > >> > > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > have
> > > >> > > > > > > > > > > > > > > > > more exposure to leader epoch and
> > > finding
> > > >> > point
> > > >> > > > of
> > > >> > > > > > > > > > > non-divergence
> > > >> > > > > > > > > > > > > in
> > > >> > > > > > > > > > > > > > > > > KafkaConsumer API. I understand this
> > is
> > > >> the
> > > >> > > case
> > > >> > > > > you
> > > >> > > > > > > were
> > > >> > > > > > > > > > > talking
> > > >> > > > > > > > > > > > > > > about.
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM
> Dong
> > > Lin
> > > >> <
> > > >> > > > > > > > > > lindon...@gmail.com
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >> Hey Anna,
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> Thanks much for your detailed
> > > explanation
> > > >> > and
> > > >> > > > > > example!
> > > >> > > > > > > > It
> > > >> > > > > > > > > > does
> > > >> > > > > > > > > > > > > help
> > > >> > > > > > > > > > > > > > me
> > > >> > > > > > > > > > > > > > > > >> understand the difference between
> our
> > > >> > > > > understanding.
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> So it seems that the solution based
> > on
> > > >> > > > > findOffsets()
> > > >> > > > > > > > > > currently
> > > >> > > > > > > > > > > > > > focuses
> > > >> > > > > > > > > > > > > > > > >> mainly on the scenario that
> consumer
> > > has
> > > >> > > cached
> > > >> > > > > > > > > leaderEpoch
> > > >> > > > > > > > > > ->
> > > >> > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > >> mapping whereas I was thinking
> about
> > > the
> > > >> > > general
> > > >> > > > > > case
> > > >> > > > > > > > > where
> > > >> > > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > may
> > > >> > > > > > > > > > > > > > > > >> or
> > > >> > > > > > > > > > > > > > > > >> may not have this cache. I guess
> that
> > > is
> > > >> why
> > > >> > > we
> > > >> > > > > have
> > > >> > > > > > > > > > different
> > > >> > > > > > > > > > > > > > > > >> understanding here. I have some
> > > comments
> > > >> > > below.
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> 3) The proposed solution using
> > > >> > > > findOffsets(offset,
> > > >> > > > > > > > > > > leaderEpoch)
> > > >> > > > > > > > > > > > > > > followed
> > > >> > > > > > > > > > > > > > > > >> by
> > > >> > > > > > > > > > > > > > > > >> seek(offset) works if consumer has
> > the
> > > >> > cached
> > > >> > > > > > > > leaderEpoch
> > > >> > > > > > > > > ->
> > > >> > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > >> mapping. But if we assume consumer
> > has
> > > >> this
> > > >> > > > cache,
> > > >> > > > > > do
> > > >> > > > > > > we
> > > >> > > > > > > > > > need
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > have
> > > >> > > > > > > > > > > > > > > > >> leaderEpoch in the
> findOffsets(...)?
> > > >> > > > Intuitively,
> > > >> > > > > > the
> > > >> > > > > > > > > > > > > > > > findOffsets(offset)
> > > >> > > > > > > > > > > > > > > > >> can also derive the leaderEpoch
> using
> > > >> offset
> > > >> > > > just
> > > >> > > > > > like
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > > > proposed
> > > >> > > > > > > > > > > > > > > > >> solution does with seek(offset).
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> 4) If consumer does not have cached
> > > >> > > leaderEpoch
> > > >> > > > ->
> > > >> > > > > > > > offset
> > > >> > > > > > > > > > > > mapping,
> > > >> > > > > > > > > > > > > > > which
> > > >> > > > > > > > > > > > > > > > >> is
> > > >> > > > > > > > > > > > > > > > >> the case if consumer is restarted
> on
> > a
> > > >> new
> > > >> > > > > machine,
> > > >> > > > > > > then
> > > >> > > > > > > > > it
> > > >> > > > > > > > > > is
> > > >> > > > > > > > > > > > not
> > > >> > > > > > > > > > > > > > > clear
> > > >> > > > > > > > > > > > > > > > >> what leaderEpoch would be included
> in
> > > the
> > > >> > > > > > FetchRequest
> > > >> > > > > > > > if
> > > >> > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > does
> > > >> > > > > > > > > > > > > > > > >> seek(offset). This is the case that
> > > >> > motivates
> > > >> > > > the
> > > >> > > > > > > first
> > > >> > > > > > > > > > > question
> > > >> > > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> previous email. In general, maybe
> we
> > > >> should
> > > >> > > > > discuss
> > > >> > > > > > > the
> > > >> > > > > > > > > > final
> > > >> > > > > > > > > > > > > > solution
> > > >> > > > > > > > > > > > > > > > >> that
> > > >> > > > > > > > > > > > > > > > >> covers all cases?
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> 5) The second question in my
> previous
> > > >> email
> > > >> > is
> > > >> > > > > > related
> > > >> > > > > > > > to
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > following
> > > >> > > > > > > > > > > > > > > > >> paragraph:
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> "... In some cases, offsets
> returned
> > > from
> > > >> > > > > position()
> > > >> > > > > > > > could
> > > >> > > > > > > > > > be
> > > >> > > > > > > > > > > > > actual
> > > >> > > > > > > > > > > > > > > > >> consumed messages by this consumer
> > > >> > identified
> > > >> > > by
> > > >> > > > > > > > {offset,
> > > >> > > > > > > > > > > leader
> > > >> > > > > > > > > > > > > > > epoch}.
> > > >> > > > > > > > > > > > > > > > >> In
> > > >> > > > > > > > > > > > > > > > >> other cases, position() returns
> > offset
> > > >> that
> > > >> > > was
> > > >> > > > > not
> > > >> > > > > > > > > actually
> > > >> > > > > > > > > > > > > > consumed.
> > > >> > > > > > > > > > > > > > > > >> Suppose, the user calls position()
> > for
> > > >> the
> > > >> > > last
> > > >> > > > > > > > > offset...".
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> I guess my point is that, if user
> > calls
> > > >> > > > position()
> > > >> > > > > > for
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > last
> > > >> > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > >> uses that offset in seek(...), then
> > > user
> > > >> can
> > > >> > > > > > probably
> > > >> > > > > > > > just
> > > >> > > > > > > > > > > call
> > > >> > > > > > > > > > > > > > > > >> Consumer#seekToEnd() without
> calling
> > > >> > > position()
> > > >> > > > > and
> > > >> > > > > > > > > > seek(...).
> > > >> > > > > > > > > > > > > > > Similarly
> > > >> > > > > > > > > > > > > > > > >> user can call
> > > Consumer#seekToBeginning()
> > > >> to
> > > >> > > the
> > > >> > > > > seek
> > > >> > > > > > > to
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > > earliest
> > > >> > > > > > > > > > > > > > > > >> position without calling position()
> > and
> > > >> > > > seek(...).
> > > >> > > > > > > Thus
> > > >> > > > > > > > > > > > position()
> > > >> > > > > > > > > > > > > > > only
> > > >> > > > > > > > > > > > > > > > >> needs to return the actual consumed
> > > >> messages
> > > >> > > > > > > identified
> > > >> > > > > > > > by
> > > >> > > > > > > > > > > > > {offset,
> > > >> > > > > > > > > > > > > > > > leader
> > > >> > > > > > > > > > > > > > > > >> epoch}. Does this make sense?
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> Thanks,
> > > >> > > > > > > > > > > > > > > > >> Dong
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM,
> Anna
> > > >> > Povzner <
> > > >> > > > > > > > > > > a...@confluent.io
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >>
> > > >> > > > > > > > > > > > > > > > >> > Hi Dong,
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Thanks for considering my
> > > suggestions.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Based on your comments, I
> realized
> > > >> that my
> > > >> > > > > > > suggestion
> > > >> > > > > > > > > was
> > > >> > > > > > > > > > > not
> > > >> > > > > > > > > > > > > > > complete
> > > >> > > > > > > > > > > > > > > > >> with
> > > >> > > > > > > > > > > > > > > > >> > regard to KafkaConsumer API vs.
> > > >> > > > consumer-broker
> > > >> > > > > > > > > protocol.
> > > >> > > > > > > > > > > > While
> > > >> > > > > > > > > > > > > I
> > > >> > > > > > > > > > > > > > > > >> propose
> > > >> > > > > > > > > > > > > > > > >> > to keep KafkaConsumer#seek()
> > > unchanged
> > > >> and
> > > >> > > > take
> > > >> > > > > > > offset
> > > >> > > > > > > > > > only,
> > > >> > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> underlying
> > > >> > > > > > > > > > > > > > > > >> > consumer will send the next
> > > >> FetchRequest()
> > > >> > > to
> > > >> > > > > > broker
> > > >> > > > > > > > > with
> > > >> > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch if it is known (based
> > on
> > > >> > leader
> > > >> > > > > epoch
> > > >> > > > > > > > cache
> > > >> > > > > > > > > in
> > > >> > > > > > > > > > > > > > > consumer) —
> > > >> > > > > > > > > > > > > > > > >> note
> > > >> > > > > > > > > > > > > > > > >> > that this is different from the
> > > current
> > > >> > KIP,
> > > >> > > > > which
> > > >> > > > > > > > > > suggests
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > always
> > > >> > > > > > > > > > > > > > > > >> send
> > > >> > > > > > > > > > > > > > > > >> > unknown leader epoch after
> seek().
> > > This
> > > >> > way,
> > > >> > > > if
> > > >> > > > > > the
> > > >> > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > and a
> > > >> > > > > > > > > > > > > > > > >> broker
> > > >> > > > > > > > > > > > > > > > >> > agreed on the point of
> > > non-divergence,
> > > >> > which
> > > >> > > > is
> > > >> > > > > > some
> > > >> > > > > > > > > > > {offset,
> > > >> > > > > > > > > > > > > > > > >> leaderEpoch}
> > > >> > > > > > > > > > > > > > > > >> > pair, the new leader which causes
> > > >> another
> > > >> > > > > > truncation
> > > >> > > > > > > > > (even
> > > >> > > > > > > > > > > > > further
> > > >> > > > > > > > > > > > > > > > back)
> > > >> > > > > > > > > > > > > > > > >> > will be able to detect new
> > divergence
> > > >> and
> > > >> > > > > restart
> > > >> > > > > > > the
> > > >> > > > > > > > > > > process
> > > >> > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > finding
> > > >> > > > > > > > > > > > > > > > >> > the new point of non-divergence.
> > So,
> > > to
> > > >> > > answer
> > > >> > > > > > your
> > > >> > > > > > > > > > > question,
> > > >> > > > > > > > > > > > If
> > > >> > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > truncation happens just after the
> > > user
> > > >> > calls
> > > >> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(
> offset,
> > > >> > > > leaderEpoch)
> > > >> > > > > > > > followed
> > > >> > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > > seek(offset),
> > > >> > > > > > > > > > > > > > > > >> > the user will not seek to the
> wrong
> > > >> > position
> > > >> > > > > > without
> > > >> > > > > > > > > > knowing
> > > >> > > > > > > > > > > > > that
> > > >> > > > > > > > > > > > > > > > >> > truncation has happened, because
> > the
> > > >> > > consumer
> > > >> > > > > will
> > > >> > > > > > > get
> > > >> > > > > > > > > > > another
> > > >> > > > > > > > > > > > > > > > >> truncation
> > > >> > > > > > > > > > > > > > > > >> > error, and seek again.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > I am afraid, I did not understand
> > > your
> > > >> > > second
> > > >> > > > > > > > question.
> > > >> > > > > > > > > > Let
> > > >> > > > > > > > > > > me
> > > >> > > > > > > > > > > > > > > > >> summarize my
> > > >> > > > > > > > > > > > > > > > >> > suggestions again, and then give
> an
> > > >> > example
> > > >> > > to
> > > >> > > > > > > > hopefully
> > > >> > > > > > > > > > > make
> > > >> > > > > > > > > > > > my
> > > >> > > > > > > > > > > > > > > > >> > suggestions more clear. Also, the
> > > last
> > > >> > part
> > > >> > > of
> > > >> > > > > my
> > > >> > > > > > > > > example
> > > >> > > > > > > > > > > > shows
> > > >> > > > > > > > > > > > > > how
> > > >> > > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > use-case in your first question
> > will
> > > >> work.
> > > >> > > If
> > > >> > > > it
> > > >> > > > > > > does
> > > >> > > > > > > > > not
> > > >> > > > > > > > > > > > answer
> > > >> > > > > > > > > > > > > > > your
> > > >> > > > > > > > > > > > > > > > >> > second question, would you mind
> > > >> > clarifying?
> > > >> > > I
> > > >> > > > am
> > > >> > > > > > > also
> > > >> > > > > > > > > > > focusing
> > > >> > > > > > > > > > > > > on
> > > >> > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> case
> > > >> > > > > > > > > > > > > > > > >> > of a consumer having enough
> entries
> > > in
> > > >> the
> > > >> > > > > cache.
> > > >> > > > > > > The
> > > >> > > > > > > > > case
> > > >> > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > restarting
> > > >> > > > > > > > > > > > > > > > >> > from committed offset either
> stored
> > > >> > > externally
> > > >> > > > > or
> > > >> > > > > > > > > > internally
> > > >> > > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > > > >> probably
> > > >> > > > > > > > > > > > > > > > >> > need to be discussed more.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Let me summarize my suggestion
> > again:
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > 1) KafkaConsumer#seek() and
> > > >> > > > > > KafkaConsumer#position()
> > > >> > > > > > > > > > remains
> > > >> > > > > > > > > > > > > > > unchanged
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > 2) New
> KafkaConsumer#findOffsets()
> > > >> takes
> > > >> > > > > {offset,
> > > >> > > > > > > > > > > leaderEpoch}
> > > >> > > > > > > > > > > > > > pair
> > > >> > > > > > > > > > > > > > > > per
> > > >> > > > > > > > > > > > > > > > >> > topic partition and returns
> offset
> > > per
> > > >> > topic
> > > >> > > > > > > > partition.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > 3) FetchRequest() to broker after
> > > >> > > > > > > KafkaConsumer#seek()
> > > >> > > > > > > > > > will
> > > >> > > > > > > > > > > > > > contain
> > > >> > > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > offset set by seek and
> leaderEpoch
> > > that
> > > >> > > > > > corresponds
> > > >> > > > > > > to
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > based
> > > >> > > > > > > > > > > > > > > > >> on
> > > >> > > > > > > > > > > > > > > > >> > leader epoch cache in the
> consumer.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > The rest of this e-mail is a long
> > and
> > > >> > > > contrived
> > > >> > > > > > > > example
> > > >> > > > > > > > > > with
> > > >> > > > > > > > > > > > > > several
> > > >> > > > > > > > > > > > > > > > log
> > > >> > > > > > > > > > > > > > > > >> > truncations and unclean leader
> > > >> elections
> > > >> > to
> > > >> > > > > > > illustrate
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > API
> > > >> > > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > your
> > > >> > > > > > > > > > > > > > > > >> > first use-case. Suppose we have
> > three
> > > >> > > brokers.
> > > >> > > > > > > > > Initially,
> > > >> > > > > > > > > > > > Broker
> > > >> > > > > > > > > > > > > > A,
> > > >> > > > > > > > > > > > > > > B,
> > > >> > > > > > > > > > > > > > > > >> and
> > > >> > > > > > > > > > > > > > > > >> > C has one message at offset 0
> with
> > > >> leader
> > > >> > > > epoch
> > > >> > > > > 0.
> > > >> > > > > > > > Then,
> > > >> > > > > > > > > > > > Broker
> > > >> > > > > > > > > > > > > A
> > > >> > > > > > > > > > > > > > > goes
> > > >> > > > > > > > > > > > > > > > >> down
> > > >> > > > > > > > > > > > > > > > >> > for some time. Broker B becomes a
> > > >> leader
> > > >> > > with
> > > >> > > > > > epoch
> > > >> > > > > > > 1,
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > > > > writes
> > > >> > > > > > > > > > > > > > > > >> messages
> > > >> > > > > > > > > > > > > > > > >> > to offsets 1 and 2. Broker C
> > fetches
> > > >> > offset
> > > >> > > 1,
> > > >> > > > > but
> > > >> > > > > > > > > before
> > > >> > > > > > > > > > > > > fetching
> > > >> > > > > > > > > > > > > > > > >> offset
> > > >> > > > > > > > > > > > > > > > >> > 2, becomes a leader with leader
> > > epoch 2
> > > >> > and
> > > >> > > > > > writes a
> > > >> > > > > > > > > > message
> > > >> > > > > > > > > > > > at
> > > >> > > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > >> 2.
> > > >> > > > > > > > > > > > > > > > >> > Here is the state of brokers at
> > this
> > > >> > point:
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > > Broker A:
> > > >> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 <— leader
> > > >> > > > > > > > > > > > > > > > >> > > goes down…
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > > Broker B:
> > > >> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
> > > >> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1  <- leader
> > > >> > > > > > > > > > > > > > > > >> > > offset 2, epoch 1
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Broker C:
> > > >> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
> > > >> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1
> > > >> > > > > > > > > > > > > > > > >> > > offset 2, epoch 2 <— leader
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Before Broker C becomes a leader
> > with
> > > >> > leader
> > > >> > > > > epoch
> > > >> > > > > > > 2,
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > > >> consumed
> > > >> > > > > > > > > > > > > > > > >> > the following messages from
> broker
> > A
> > > >> and
> > > >> > > > broker
> > > >> > > > > B:
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > {offset=0, leaderEpoch=0},
> > {offset=1,
> > > >> > > > > > > leaderEpoch=1},
> > > >> > > > > > > > > > > > {offset=2,
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch=1}.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Consumer’s leader epoch cache at
> > this
> > > >> > point
> > > >> > > > > > contains
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > > > following
> > > >> > > > > > > > > > > > > > > > >> entries:
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > (leaderEpoch=0, startOffset=0)
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > (leaderEpoch=1, startOffset=1)
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > endOffset = 3
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Then, broker B becomes the
> follower
> > > of
> > > >> > > broker
> > > >> > > > C,
> > > >> > > > > > > > > truncates
> > > >> > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > starts
> > > >> > > > > > > > > > > > > > > > >> > fetching from offset 2.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Consumer sends
> > fetchRequest(offset=3,
> > > >> > > > > > leaderEpoch=1)
> > > >> > > > > > > > and
> > > >> > > > > > > > > > > gets
> > > >> > > > > > > > > > > > > > > > >> > LOG_TRUNCATION
> > > >> > > > > > > > > > > > > > > > >> > error from broker C.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > In response, the client calls
> > > >> > > > > > > > KafkaConsumer#findOffsets(
> > > >> > > > > > > > > > > > > offset=3,
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying
> > > consumer
> > > >> > > sends
> > > >> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(
> > > leaderEpoch=1),
> > > >> > > broker
> > > >> > > > C
> > > >> > > > > > > > responds
> > > >> > > > > > > > > > with
> > > >> > > > > > > > > > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > >> > > > > > > > > > > > > > > KafkaConsumer#findOffsets(offset=3,
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch=1) returns offset=2.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > In response, consumer calls
> > > >> > > KafkaConsumer@seek
> > > >> > > > > > > > > (offset=2)
> > > >> > > > > > > > > > > > > followed
> > > >> > > > > > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > > >> > poll(), which results in
> > > >> > > > FetchRequest(offset=2,
> > > >> > > > > > > > > > > leaderEpoch=1)
> > > >> > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> broker C.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > I will continue with this example
> > > with
> > > >> the
> > > >> > > > goal
> > > >> > > > > to
> > > >> > > > > > > > > answer
> > > >> > > > > > > > > > > your
> > > >> > > > > > > > > > > > > > first
> > > >> > > > > > > > > > > > > > > > >> > question about truncation just
> > after
> > > >> > > > > findOffsets()
> > > >> > > > > > > > > > followed
> > > >> > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > seek():
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Suppose, brokers B and C go down,
> > and
> > > >> > > broker A
> > > >> > > > > > comes
> > > >> > > > > > > > up
> > > >> > > > > > > > > > and
> > > >> > > > > > > > > > > > > > becomes
> > > >> > > > > > > > > > > > > > > a
> > > >> > > > > > > > > > > > > > > > >> > leader with leader epoch 3, and
> > > writes
> > > >> a
> > > >> > > > message
> > > >> > > > > > to
> > > >> > > > > > > > > offset
> > > >> > > > > > > > > > > 1.
> > > >> > > > > > > > > > > > > > > Suppose,
> > > >> > > > > > > > > > > > > > > > >> this
> > > >> > > > > > > > > > > > > > > > >> > happens before the consumer gets
> > > >> response
> > > >> > > from
> > > >> > > > > > > broker
> > > >> > > > > > > > C
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > previous
> > > >> > > > > > > > > > > > > > > > >> > fetch request:
> > > FetchRequest(offset=2,
> > > >> > > > > > > leaderEpoch=1).
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Consumer re-sends
> > > >> FetchRequest(offset=2,
> > > >> > > > > > > > leaderEpoch=1)
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > > > broker
> > > >> > > > > > > > > > > > > > A,
> > > >> > > > > > > > > > > > > > > > >> which
> > > >> > > > > > > > > > > > > > > > >> > returns LOG_TRUNCATION error,
> > because
> > > >> > > broker A
> > > >> > > > > has
> > > >> > > > > > > > > leader
> > > >> > > > > > > > > > > > epoch
> > > >> > > > > > > > > > > > > 3
> > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > > > > >> leader
> > > >> > > > > > > > > > > > > > > > >> > epoch in FetchRequest with
> starting
> > > >> > offset =
> > > >> > > > 1 <
> > > >> > > > > > > > offset
> > > >> > > > > > > > > 2
> > > >> > > > > > > > > > in
> > > >> > > > > > > > > > > > > > > > >> > FetchRequest().
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > In response, the user calls
> > > >> > > > > > > KafkaConsumer#findOffsets(
> > > >> > > > > > > > > > > > offset=2,
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying
> > > consumer
> > > >> > > sends
> > > >> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(
> > > leaderEpoch=1),
> > > >> > > broker
> > > >> > > > A
> > > >> > > > > > > > responds
> > > >> > > > > > > > > > with
> > > >> > > > > > > > > > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the
> > > >> > underlying
> > > >> > > > > > > consumer
> > > >> > > > > > > > > > finds
> > > >> > > > > > > > > > > > > > > > leaderEpoch
> > > >> > > > > > > > > > > > > > > > >> = 0
> > > >> > > > > > > > > > > > > > > > >> > in its cache with end offset ==
> 1,
> > > >> which
> > > >> > > > results
> > > >> > > > > > in
> > > >> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offs
> > et=2,
> > > >> > > > > > leaderEpoch=1)
> > > >> > > > > > > > > > returning
> > > >> > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > = 1.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > In response, the user calls
> > > >> > > KafkaConsumer@seek
> > > >> > > > > > > > > (offset=1)
> > > >> > > > > > > > > > > > > followed
> > > >> > > > > > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > > >> > poll(), which results in
> > > >> > > > FetchRequest(offset=1,
> > > >> > > > > > > > > > > leaderEpoch=0)
> > > >> > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> broker A,
> > > >> > > > > > > > > > > > > > > > >> > which responds with message at
> > offset
> > > >> 1,
> > > >> > > > leader
> > > >> > > > > > > epoch
> > > >> > > > > > > > 3.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > I will think some more about
> > > consumers
> > > >> > > > > restarting
> > > >> > > > > > > from
> > > >> > > > > > > > > > > > committed
> > > >> > > > > > > > > > > > > > > > >> offsets,
> > > >> > > > > > > > > > > > > > > > >> > and send a follow up.
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Thanks,
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > Anna
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM
> Dong
> > > >> Lin <
> > > >> > > > > > > > > > > lindon...@gmail.com>
> > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >> >
> > > >> > > > > > > > > > > > > > > > >> > > Hey Anna,
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > Thanks much for the thoughtful
> > > >> reply. It
> > > >> > > > makes
> > > >> > > > > > > sense
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > > > > different
> > > >> > > > > > > > > > > > > > > > >> between
> > > >> > > > > > > > > > > > > > > > >> > > "seeking to a message" and
> > "seeking
> > > >> to a
> > > >> > > > > > > position".
> > > >> > > > > > > > I
> > > >> > > > > > > > > > have
> > > >> > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> questions
> > > >> > > > > > > > > > > > > > > > >> > > here:
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > - For "seeking to a message"
> > > >> use-case,
> > > >> > > with
> > > >> > > > > the
> > > >> > > > > > > > > proposed
> > > >> > > > > > > > > > > > > > approach
> > > >> > > > > > > > > > > > > > > > user
> > > >> > > > > > > > > > > > > > > > >> > > needs to call
> findOffset(offset,
> > > >> > > > leaderEpoch)
> > > >> > > > > > > > followed
> > > >> > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > > >> seek(offset).
> > > >> > > > > > > > > > > > > > > > >> > If
> > > >> > > > > > > > > > > > > > > > >> > > message truncation and message
> > > append
> > > >> > > happen
> > > >> > > > > > > > > immediately
> > > >> > > > > > > > > > > > after
> > > >> > > > > > > > > > > > > > > > >> > > findOffset(offset,
> > > >> > > > > > > > > > > > > > > > >> > > leaderEpoch) but before
> > > >> seek(offset), it
> > > >> > > > seems
> > > >> > > > > > > that
> > > >> > > > > > > > > user
> > > >> > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > seek
> > > >> > > > > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> the
> > > >> > > > > > > > > > > > > > > > >> > > wrong message without knowing
> the
> > > >> > > truncation
> > > >> > > > > has
> > > >> > > > > > > > > > happened.
> > > >> > > > > > > > > > > > > Would
> > > >> > > > > > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > > >> be
> > > >> > > > > > > > > > > > > > > > >> > a
> > > >> > > > > > > > > > > > > > > > >> > > problem?
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > - For "seeking to a position"
> > > >> use-case,
> > > >> > it
> > > >> > > > > seems
> > > >> > > > > > > > that
> > > >> > > > > > > > > > > there
> > > >> > > > > > > > > > > > > can
> > > >> > > > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > two
> > > >> > > > > > > > > > > > > > > > >> > > positions, i.e. earliest and
> > > latest.
> > > >> So
> > > >> > > > these
> > > >> > > > > > two
> > > >> > > > > > > > > cases
> > > >> > > > > > > > > > > can
> > > >> > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > >> > > Consumer.fulfilled by
> > > >> seekToBeginning()
> > > >> > > and
> > > >> > > > > > > > > > > > > > Consumer.seekToEnd().
> > > >> > > > > > > > > > > > > > > > >> Then it
> > > >> > > > > > > > > > > > > > > > >> > > seems that user will only need
> to
> > > >> call
> > > >> > > > > > position()
> > > >> > > > > > > > and
> > > >> > > > > > > > > > > seek()
> > > >> > > > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > > > >> "seeking
> > > >> > > > > > > > > > > > > > > > >> > > to a message" use-case?
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > Thanks,
> > > >> > > > > > > > > > > > > > > > >> > > Dong
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33
> PM,
> > > Anna
> > > >> > > > Povzner
> > > >> > > > > <
> > > >> > > > > > > > > > > > > > a...@confluent.io>
> > > >> > > > > > > > > > > > > > > > >> wrote:
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > > Hi Jason and Dong,
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > I’ve been thinking about your
> > > >> > > suggestions
> > > >> > > > > and
> > > >> > > > > > > > > > discussion
> > > >> > > > > > > > > > > > > > > regarding
> > > >> > > > > > > > > > > > > > > > >> > > > position(), seek(), and new
> > > >> proposed
> > > >> > > API.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > Here is my thought process
> why
> > we
> > > >> > should
> > > >> > > > > keep
> > > >> > > > > > > > > > position()
> > > >> > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > seek()
> > > >> > > > > > > > > > > > > > > > >> API
> > > >> > > > > > > > > > > > > > > > >> > > > unchanged.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > I think we should separate
> > > {offset,
> > > >> > > leader
> > > >> > > > > > > epoch}
> > > >> > > > > > > > > that
> > > >> > > > > > > > > > > > > > uniquely
> > > >> > > > > > > > > > > > > > > > >> > > identifies
> > > >> > > > > > > > > > > > > > > > >> > > > a message from an offset that
> > is
> > > a
> > > >> > > > position.
> > > >> > > > > > In
> > > >> > > > > > > > some
> > > >> > > > > > > > > > > > cases,
> > > >> > > > > > > > > > > > > > > > offsets
> > > >> > > > > > > > > > > > > > > > >> > > > returned from position()
> could
> > be
> > > >> > actual
> > > >> > > > > > > consumed
> > > >> > > > > > > > > > > messages
> > > >> > > > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > > >> > > consumer
> > > >> > > > > > > > > > > > > > > > >> > > > identified by {offset, leader
> > > >> epoch}.
> > > >> > In
> > > >> > > > > other
> > > >> > > > > > > > > cases,
> > > >> > > > > > > > > > > > > > position()
> > > >> > > > > > > > > > > > > > > > >> > returns
> > > >> > > > > > > > > > > > > > > > >> > > > offset that was not actually
> > > >> consumed.
> > > >> > > > > > Suppose,
> > > >> > > > > > > > the
> > > >> > > > > > > > > > user
> > > >> > > > > > > > > > > > > calls
> > > >> > > > > > > > > > > > > > > > >> > position()
> > > >> > > > > > > > > > > > > > > > >> > > > for the last offset. Suppose
> we
> > > >> return
> > > >> > > > > > {offset,
> > > >> > > > > > > > > leader
> > > >> > > > > > > > > > > > > epoch}
> > > >> > > > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > > > message currently in the log.
> > > Then,
> > > >> > the
> > > >> > > > > > message
> > > >> > > > > > > > gets
> > > >> > > > > > > > > > > > > truncated
> > > >> > > > > > > > > > > > > > > > >> before
> > > >> > > > > > > > > > > > > > > > >> > > > consumer’s first poll(). It
> > does
> > > >> not
> > > >> > > make
> > > >> > > > > > sense
> > > >> > > > > > > > for
> > > >> > > > > > > > > > > poll()
> > > >> > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > fail
> > > >> > > > > > > > > > > > > > > > >> in
> > > >> > > > > > > > > > > > > > > > >> > > this
> > > >> > > > > > > > > > > > > > > > >> > > > case, because the log
> > truncation
> > > >> did
> > > >> > not
> > > >> > > > > > > actually
> > > >> > > > > > > > > > happen
> > > >> > > > > > > > > > > > > from
> > > >> > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > > consumer
> > > >> > > > > > > > > > > > > > > > >> > > > perspective. On the other
> hand,
> > > as
> > > >> the
> > > >> > > KIP
> > > >> > > > > > > > proposes,
> > > >> > > > > > > > > > it
> > > >> > > > > > > > > > > > > makes
> > > >> > > > > > > > > > > > > > > > sense
> > > >> > > > > > > > > > > > > > > > >> for
> > > >> > > > > > > > > > > > > > > > >> > > the
> > > >> > > > > > > > > > > > > > > > >> > > > committed() method to return
> > > >> {offset,
> > > >> > > > leader
> > > >> > > > > > > > epoch}
> > > >> > > > > > > > > > > > because
> > > >> > > > > > > > > > > > > > > those
> > > >> > > > > > > > > > > > > > > > >> > offsets
> > > >> > > > > > > > > > > > > > > > >> > > > represent actual consumed
> > > messages.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > The same argument applies to
> > the
> > > >> > seek()
> > > >> > > > > > method —
> > > >> > > > > > > > we
> > > >> > > > > > > > > > are
> > > >> > > > > > > > > > > > not
> > > >> > > > > > > > > > > > > > > > seeking
> > > >> > > > > > > > > > > > > > > > >> to
> > > >> > > > > > > > > > > > > > > > >> > a
> > > >> > > > > > > > > > > > > > > > >> > > > message, we are seeking to a
> > > >> position.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > I like the proposal to add
> > > >> > > > > > > > > KafkaConsumer#findOffsets()
> > > >> > > > > > > > > > > > API.
> > > >> > > > > > > > > > > > > I
> > > >> > > > > > > > > > > > > > am
> > > >> > > > > > > > > > > > > > > > >> > assuming
> > > >> > > > > > > > > > > > > > > > >> > > > something like:
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > Map<TopicPartition, Long>
> > > >> > > > > > > > > > > findOffsets(Map<TopicPartition,
> > > >> > > > > > > > > > > > > > > > >> > OffsetAndEpoch>
> > > >> > > > > > > > > > > > > > > > >> > > > offsetsToSearch)
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > Similar to seek() and
> > > position(), I
> > > >> > > think
> > > >> > > > > > > > > > findOffsets()
> > > >> > > > > > > > > > > > > should
> > > >> > > > > > > > > > > > > > > > >> return
> > > >> > > > > > > > > > > > > > > > >> > > > offset without leader epoch,
> > > >> because
> > > >> > > what
> > > >> > > > we
> > > >> > > > > > > want
> > > >> > > > > > > > is
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > >> that we
> > > >> > > > > > > > > > > > > > > > >> > > > think is closest to the not
> > > >> divergent
> > > >> > > > > message
> > > >> > > > > > > from
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > given
> > > >> > > > > > > > > > > > > > > > >> consumed
> > > >> > > > > > > > > > > > > > > > >> > > > message. Until the consumer
> > > >> actually
> > > >> > > > fetches
> > > >> > > > > > the
> > > >> > > > > > > > > > > message,
> > > >> > > > > > > > > > > > we
> > > >> > > > > > > > > > > > > > > > should
> > > >> > > > > > > > > > > > > > > > >> not
> > > >> > > > > > > > > > > > > > > > >> > > let
> > > >> > > > > > > > > > > > > > > > >> > > > the consumer store the leader
> > > epoch
> > > >> > for
> > > >> > > a
> > > >> > > > > > > message
> > > >> > > > > > > > it
> > > >> > > > > > > > > > did
> > > >> > > > > > > > > > > > not
> > > >> > > > > > > > > > > > > > > > >> consume.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > So, the workflow will be:
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > 1) The user gets
> > > >> > LogTruncationException
> > > >> > > > with
> > > >> > > > > > > > > {offset,
> > > >> > > > > > > > > > > > leader
> > > >> > > > > > > > > > > > > > > epoch
> > > >> > > > > > > > > > > > > > > > >> of
> > > >> > > > > > > > > > > > > > > > >> > the
> > > >> > > > > > > > > > > > > > > > >> > > > previous message} (whatever
> we
> > > send
> > > >> > with
> > > >> > > > new
> > > >> > > > > > > > > > > FetchRecords
> > > >> > > > > > > > > > > > > > > > request).
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > 2) offset = findOffsets(tp ->
> > > >> {offset,
> > > >> > > > > leader
> > > >> > > > > > > > > epoch})
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > 3) seek(offset)
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > For the use-case where the
> > users
> > > >> store
> > > >> > > > > > committed
> > > >> > > > > > > > > > offsets
> > > >> > > > > > > > > > > > > > > > externally:
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > 1) Such users would have to
> > track
> > > >> the
> > > >> > > > leader
> > > >> > > > > > > epoch
> > > >> > > > > > > > > > > > together
> > > >> > > > > > > > > > > > > > with
> > > >> > > > > > > > > > > > > > > > an
> > > >> > > > > > > > > > > > > > > > >> > > offset.
> > > >> > > > > > > > > > > > > > > > >> > > > Otherwise, there is no way to
> > > >> detect
> > > >> > > later
> > > >> > > > > > what
> > > >> > > > > > > > > leader
> > > >> > > > > > > > > > > > epoch
> > > >> > > > > > > > > > > > > > was
> > > >> > > > > > > > > > > > > > > > >> > > associated
> > > >> > > > > > > > > > > > > > > > >> > > > with the message. I think
> it’s
> > > >> > > reasonable
> > > >> > > > to
> > > >> > > > > > ask
> > > >> > > > > > > > > that
> > > >> > > > > > > > > > > from
> > > >> > > > > > > > > > > > > > users
> > > >> > > > > > > > > > > > > > > > if
> > > >> > > > > > > > > > > > > > > > >> > they
> > > >> > > > > > > > > > > > > > > > >> > > > want to detect log
> truncation.
> > > >> > > Otherwise,
> > > >> > > > > they
> > > >> > > > > > > > will
> > > >> > > > > > > > > > get
> > > >> > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > current
> > > >> > > > > > > > > > > > > > > > >> > > > behavior.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > If the users currently get an
> > > >> offset
> > > >> > to
> > > >> > > be
> > > >> > > > > > > stored
> > > >> > > > > > > > > > using
> > > >> > > > > > > > > > > > > > > > position(),
> > > >> > > > > > > > > > > > > > > > >> I
> > > >> > > > > > > > > > > > > > > > >> > see
> > > >> > > > > > > > > > > > > > > > >> > > > two possibilities. First,
> they
> > > call
> > > >> > save
> > > >> > > > > > offset
> > > >> > > > > > > > > > returned
> > > >> > > > > > > > > > > > > from
> > > >> > > > > > > > > > > > > > > > >> > position()
> > > >> > > > > > > > > > > > > > > > >> > > > that they call before poll().
> > In
> > > >> that
> > > >> > > > case,
> > > >> > > > > it
> > > >> > > > > > > > would
> > > >> > > > > > > > > > not
> > > >> > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > correct
> > > >> > > > > > > > > > > > > > > > >> to
> > > >> > > > > > > > > > > > > > > > >> > > > store {offset, leader epoch}
> if
> > > we
> > > >> > would
> > > >> > > > > have
> > > >> > > > > > > > > changed
> > > >> > > > > > > > > > > > > > position()
> > > >> > > > > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> > > return
> > > >> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} since
> > > actual
> > > >> > > > fetched
> > > >> > > > > > > > message
> > > >> > > > > > > > > > > could
> > > >> > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > >> different
> > > >> > > > > > > > > > > > > > > > >> > > > (from the example I described
> > > >> > earlier).
> > > >> > > > So,
> > > >> > > > > it
> > > >> > > > > > > > would
> > > >> > > > > > > > > > be
> > > >> > > > > > > > > > > > more
> > > >> > > > > > > > > > > > > > > > >> correct to
> > > >> > > > > > > > > > > > > > > > >> > > > call position() after poll().
> > > >> However,
> > > >> > > the
> > > >> > > > > > user
> > > >> > > > > > > > > > already
> > > >> > > > > > > > > > > > gets
> > > >> > > > > > > > > > > > > > > > >> > > > ConsumerRecords at this
> point,
> > > from
> > > >> > > which
> > > >> > > > > the
> > > >> > > > > > > user
> > > >> > > > > > > > > can
> > > >> > > > > > > > > > > > > extract
> > > >> > > > > > > > > > > > > > > > >> {offset,
> > > >> > > > > > > > > > > > > > > > >> > > > leader epoch} of the last
> > > message.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > So, I like the idea of
> adding a
> > > >> helper
> > > >> > > > > method
> > > >> > > > > > to
> > > >> > > > > > > > > > > > > > > ConsumerRecords,
> > > >> > > > > > > > > > > > > > > > as
> > > >> > > > > > > > > > > > > > > > >> > > Jason
> > > >> > > > > > > > > > > > > > > > >> > > > proposed, something like:
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > public OffsetAndEpoch
> > > >> > > > > > > lastOffsetWithLeaderEpoch(),
> > > >> > > > > > > > > > where
> > > >> > > > > > > > > > > > > > > > >> OffsetAndEpoch
> > > >> > > > > > > > > > > > > > > > >> > > is
> > > >> > > > > > > > > > > > > > > > >> > > > a data struct holding
> {offset,
> > > >> leader
> > > >> > > > > epoch}.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > In this case, we would advise
> > the
> > > >> user
> > > >> > > to
> > > >> > > > > > follow
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > > > workflow:
> > > >> > > > > > > > > > > > > > > > >> poll(),
> > > >> > > > > > > > > > > > > > > > >> > > get
> > > >> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} from
> > > >> > > > > > > > > > > ConsumerRecords#lastOffsetWith
> > > >> > > > > > > > > > > > > > > > >> > LeaderEpoch(),
> > > >> > > > > > > > > > > > > > > > >> > > > save offset and leader epoch,
> > > >> process
> > > >> > > > > records.
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > 2) When the user needs to
> seek
> > to
> > > >> the
> > > >> > > last
> > > >> > > > > > > > committed
> > > >> > > > > > > > > > > > offset,
> > > >> > > > > > > > > > > > > > > they
> > > >> > > > > > > > > > > > > > > > >> call
> > > >> > > > > > > > > > > > > > > > >> > > new
> > > >> > > > > > > > > > > > > > > > >> > > > findOffsets(saved offset,
> > leader
> > > >> > epoch),
> > > >> > > > and
> > > >> > > > > > > then
> > > >> > > > > > > > > > > > > > seek(offset).
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > What do you think?
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > Thanks,
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > Anna
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06
> PM
> > > Dong
> > > >> > Lin
> > > >> > > <
> > > >> > > > > > > > > > > > > lindon...@gmail.com>
> > > >> > > > > > > > > > > > > > > > >> wrote:
> > > >> > > > > > > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > Hey Jason,
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > Thanks much for your
> > thoughtful
> > > >> > > > > explanation.
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > Yes the solution using
> > > >> > > > findOffsets(offset,
> > > >> > > > > > > > > > > leaderEpoch)
> > > >> > > > > > > > > > > > > also
> > > >> > > > > > > > > > > > > > > > >> works.
> > > >> > > > > > > > > > > > > > > > >> > The
> > > >> > > > > > > > > > > > > > > > >> > > > > advantage of this solution
> it
> > > >> adds
> > > >> > > only
> > > >> > > > > one
> > > >> > > > > > > API
> > > >> > > > > > > > > > > instead
> > > >> > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > two
> > > >> > > > > > > > > > > > > > > > >> APIs.
> > > >> > > > > > > > > > > > > > > > >> > > The
> > > >> > > > > > > > > > > > > > > > >> > > > > concern is that its usage
> > > seems a
> > > >> > bit
> > > >> > > > more
> > > >> > > > > > > > clumsy
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > > > > advanced
> > > >> > > > > > > > > > > > > > > > >> users.
> > > >> > > > > > > > > > > > > > > > >> > > > More
> > > >> > > > > > > > > > > > > > > > >> > > > > specifically, advanced
> users
> > > who
> > > >> > store
> > > >> > > > > > offsets
> > > >> > > > > > > > > > > > externally
> > > >> > > > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > > > >> always
> > > >> > > > > > > > > > > > > > > > >> > > > need
> > > >> > > > > > > > > > > > > > > > >> > > > > to call findOffsets()
> before
> > > >> calling
> > > >> > > > > > > > seek(offset)
> > > >> > > > > > > > > > > during
> > > >> > > > > > > > > > > > > > > > consumer
> > > >> > > > > > > > > > > > > > > > >> > > > > initialization. And those
> > > >> advanced
> > > >> > > users
> > > >> > > > > > will
> > > >> > > > > > > > need
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > > > > manually
> > > >> > > > > > > > > > > > > > > > >> keep
> > > >> > > > > > > > > > > > > > > > >> > > track
> > > >> > > > > > > > > > > > > > > > >> > > > > of the leaderEpoch of the
> > last
> > > >> > > > > > ConsumerRecord.
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > The other solution may be
> > more
> > > >> > > > > user-friendly
> > > >> > > > > > > for
> > > >> > > > > > > > > > > > advanced
> > > >> > > > > > > > > > > > > > > users
> > > >> > > > > > > > > > > > > > > > >> is to
> > > >> > > > > > > > > > > > > > > > >> > > add
> > > >> > > > > > > > > > > > > > > > >> > > > > two APIs, `void
> seek(offset,
> > > >> > > > leaderEpoch)`
> > > >> > > > > > and
> > > >> > > > > > > > > > > `(offset,
> > > >> > > > > > > > > > > > > > > epoch)
> > > >> > > > > > > > > > > > > > > > =
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > offsetEpochs(topicPartition)`.
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > I kind of prefer the second
> > > >> solution
> > > >> > > > > because
> > > >> > > > > > > it
> > > >> > > > > > > > is
> > > >> > > > > > > > > > > > easier
> > > >> > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > use
> > > >> > > > > > > > > > > > > > > > >> for
> > > >> > > > > > > > > > > > > > > > >> > > > > advanced users. If we need
> to
> > > >> expose
> > > >> > > > > > > leaderEpoch
> > > >> > > > > > > > > > > anyway
> > > >> > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > safely
> > > >> > > > > > > > > > > > > > > > >> > > > identify
> > > >> > > > > > > > > > > > > > > > >> > > > > a message, it may be
> > > conceptually
> > > >> > > > simpler
> > > >> > > > > to
> > > >> > > > > > > > > expose
> > > >> > > > > > > > > > it
> > > >> > > > > > > > > > > > > > > directly
> > > >> > > > > > > > > > > > > > > > in
> > > >> > > > > > > > > > > > > > > > >> > > > > seek(...) rather than
> > requiring
> > > >> one
> > > >> > > more
> > > >> > > > > > > > > translation
> > > >> > > > > > > > > > > > using
> > > >> > > > > > > > > > > > > > > > >> > > > > findOffsets(...). But I am
> > also
> > > >> OK
> > > >> > > with
> > > >> > > > > the
> > > >> > > > > > > > first
> > > >> > > > > > > > > > > > solution
> > > >> > > > > > > > > > > > > > if
> > > >> > > > > > > > > > > > > > > > >> other
> > > >> > > > > > > > > > > > > > > > >> > > > > developers also favor that
> > one
> > > :)
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > >> > > > > Dong
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at
> 11:10
> > > AM,
> > > >> > Jason
> > > >> > > > > > > > Gustafson <
> > > >> > > > > > > > > > > > > > > > >> ja...@confluent.io
> > > >> > > > > > > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > > > > > > >> > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > Hi Dong,
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > Thanks, I've been
> thinking
> > > >> about
> > > >> > > your
> > > >> > > > > > > > > suggestions
> > > >> > > > > > > > > > a
> > > >> > > > > > > > > > > > bit.
> > > >> > > > > > > > > > > > > > It
> > > >> > > > > > > > > > > > > > > is
> > > >> > > > > > > > > > > > > > > > >> > > > > challenging
> > > >> > > > > > > > > > > > > > > > >> > > > > > to make this work given
> the
> > > >> > current
> > > >> > > > > APIs.
> > > >> > > > > > > One
> > > >> > > > > > > > of
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> difficulties
> > > >> > > > > > > > > > > > > > > > >> > is
> > > >> > > > > > > > > > > > > > > > >> > > > that
> > > >> > > > > > > > > > > > > > > > >> > > > > > we don't have an API to
> > find
> > > >> the
> > > >> > > > leader
> > > >> > > > > > > epoch
> > > >> > > > > > > > > for
> > > >> > > > > > > > > > a
> > > >> > > > > > > > > > > > > given
> > > >> > > > > > > > > > > > > > > > >> offset at
> > > >> > > > > > > > > > > > > > > > >> > > the
> > > >> > > > > > > > > > > > > > > > >> > > > > > moment. So if the user
> > does a
> > > >> seek
> > > >> > > to
> > > >> > > > > > offset
> > > >> > > > > > > > 5,
> > > >> > > > > > > > > > then
> > > >> > > > > > > > > > > > > we'll
> > > >> > > > > > > > > > > > > > > > need
> > > >> > > > > > > > > > > > > > > > >> a
> > > >> > > > > > > > > > > > > > > > >> > new
> > > >> > > > > > > > > > > > > > > > >> > > > API
> > > >> > > > > > > > > > > > > > > > >> > > > > > to find the corresponding
> > > >> epoch in
> > > >> > > > order
> > > >> > > > > > to
> > > >> > > > > > > > > > fulfill
> > > >> > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > new
> > > >> > > > > > > > > > > > > > > > >> > > position()
> > > >> > > > > > > > > > > > > > > > >> > > > > API.
> > > >> > > > > > > > > > > > > > > > >> > > > > > Potentially we could
> modify
> > > >> > > > ListOffsets
> > > >> > > > > to
> > > >> > > > > > > > > enable
> > > >> > > > > > > > > > > > > finding
> > > >> > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > leader
> > > >> > > > > > > > > > > > > > > > >> > > > > epoch,
> > > >> > > > > > > > > > > > > > > > >> > > > > > but I am not sure it is
> > > >> > worthwhile.
> > > >> > > > > > Perhaps
> > > >> > > > > > > it
> > > >> > > > > > > > > is
> > > >> > > > > > > > > > > > > > reasonable
> > > >> > > > > > > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > > > >> > > > advanced
> > > >> > > > > > > > > > > > > > > > >> > > > > > usage to expect that the
> > > epoch
> > > >> > > > > > information,
> > > >> > > > > > > if
> > > >> > > > > > > > > > > needed,
> > > >> > > > > > > > > > > > > > will
> > > >> > > > > > > > > > > > > > > be
> > > >> > > > > > > > > > > > > > > > >> > > > extracted
> > > >> > > > > > > > > > > > > > > > >> > > > > > from the records
> directly?
> > It
> > > >> > might
> > > >> > > > make
> > > >> > > > > > > sense
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > > > expose a
> > > >> > > > > > > > > > > > > > > > >> helper
> > > >> > > > > > > > > > > > > > > > >> > in
> > > >> > > > > > > > > > > > > > > > >> > > > > > `ConsumerRecords` to make
> > > this
> > > >> a
> > > >> > > > little
> > > >> > > > > > > easier
> > > >> > > > > > > > > > > though.
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > Alternatively, if we
> think
> > it
> > > >> is
> > > >> > > > > important
> > > >> > > > > > > to
> > > >> > > > > > > > > have
> > > >> > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > > >> information
> > > >> > > > > > > > > > > > > > > > >> > > > > exposed
> > > >> > > > > > > > > > > > > > > > >> > > > > > directly, we could create
> > > batch
> > > >> > APIs
> > > >> > > > to
> > > >> > > > > > > solve
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > naming
> > > >> > > > > > > > > > > > > > > > >> problem.
> > > >> > > > > > > > > > > > > > > > >> > For
> > > >> > > > > > > > > > > > > > > > >> > > > > > example:
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > Map<TopicPartition,
> > > >> > OffsetAndEpoch>
> > > >> > > > > > > > positions();
> > > >> > > > > > > > > > > > > > > > >> > > > > > void
> > seek(Map<TopicPartition,
> > > >> > > > > > > OffsetAndEpoch>
> > > >> > > > > > > > > > > > > positions);
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > However, I'm actually
> > leaning
> > > >> > toward
> > > >> > > > > > leaving
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > > seek()
> > > >> > > > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > > >> > > position()
> > > >> > > > > > > > > > > > > > > > >> > > > > APIs
> > > >> > > > > > > > > > > > > > > > >> > > > > > unchanged. Instead, we
> can
> > > add
> > > >> a
> > > >> > new
> > > >> > > > API
> > > >> > > > > > to
> > > >> > > > > > > > > search
> > > >> > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > by
> > > >> > > > > > > > > > > > > > > > >> > > > > timestamp
> > > >> > > > > > > > > > > > > > > > >> > > > > > or by offset/leader
> epoch.
> > > >> Let's
> > > >> > say
> > > >> > > > we
> > > >> > > > > > call
> > > >> > > > > > > > it
> > > >> > > > > > > > > > > > > > > `findOffsets`.
> > > >> > > > > > > > > > > > > > > > >> If
> > > >> > > > > > > > > > > > > > > > >> > the
> > > >> > > > > > > > > > > > > > > > >> > > > > user
> > > >> > > > > > > > > > > > > > > > >> > > > > > hits a log truncation
> > error,
> > > >> they
> > > >> > > can
> > > >> > > > > use
> > > >> > > > > > > this
> > > >> > > > > > > > > API
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > > find
> > > >> > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > > closest
> > > >> > > > > > > > > > > > > > > > >> > > > > > offset and then do a
> > seek().
> > > At
> > > >> > the
> > > >> > > > same
> > > >> > > > > > > time,
> > > >> > > > > > > > > we
> > > >> > > > > > > > > > > > > > deprecate
> > > >> > > > > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> > > > > > `offsetsForTimes` APIs.
> We
> > > now
> > > >> > have
> > > >> > > > two
> > > >> > > > > > use
> > > >> > > > > > > > > cases
> > > >> > > > > > > > > > > > which
> > > >> > > > > > > > > > > > > > > > require
> > > >> > > > > > > > > > > > > > > > >> > > finding
> > > >> > > > > > > > > > > > > > > > >> > > > > > offsets, so I think we
> > should
> > > >> make
> > > >> > > > this
> > > >> > > > > > API
> > > >> > > > > > > > > > general
> > > >> > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > > > leave
> > > >> > > > > > > > > > > > > > > > >> the
> > > >> > > > > > > > > > > > > > > > >> > > door
> > > >> > > > > > > > > > > > > > > > >> > > > > open
> > > >> > > > > > > > > > > > > > > > >> > > > > > for future extensions.
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > By the way, I'm unclear
> > about
> > > >> the
> > > >> > > > desire
> > > >> > > > > > to
> > > >> > > > > > > > move
> > > >> > > > > > > > > > > part
> > > >> > > > > > > > > > > > of
> > > >> > > > > > > > > > > > > > > this
> > > >> > > > > > > > > > > > > > > > >> > > > > functionality
> > > >> > > > > > > > > > > > > > > > >> > > > > > to AdminClient. Guozhang
> > > >> suggested
> > > >> > > > this
> > > >> > > > > > > > > > previously,
> > > >> > > > > > > > > > > > but
> > > >> > > > > > > > > > > > > I
> > > >> > > > > > > > > > > > > > > > think
> > > >> > > > > > > > > > > > > > > > >> it
> > > >> > > > > > > > > > > > > > > > >> > > only
> > > >> > > > > > > > > > > > > > > > >> > > > > > makes sense for
> > cross-cutting
> > > >> > > > > capabilities
> > > >> > > > > > > > such
> > > >> > > > > > > > > as
> > > >> > > > > > > > > > > > topic
> > > >> > > > > > > > > > > > > > > > >> creation.
> > > >> > > > > > > > > > > > > > > > >> > If
> > > >> > > > > > > > > > > > > > > > >> > > > we
> > > >> > > > > > > > > > > > > > > > >> > > > > > have an API which is
> > > primarily
> > > >> > > useful
> > > >> > > > by
> > > >> > > > > > > > > > consumers,
> > > >> > > > > > > > > > > > > then I
> > > >> > > > > > > > > > > > > > > > think
> > > >> > > > > > > > > > > > > > > > >> > > that's
> > > >> > > > > > > > > > > > > > > > >> > > > > > where it should be
> exposed.
> > > The
> > > >> > > > > > AdminClient
> > > >> > > > > > > > also
> > > >> > > > > > > > > > has
> > > >> > > > > > > > > > > > its
> > > >> > > > > > > > > > > > > > own
> > > >> > > > > > > > > > > > > > > > API
> > > >> > > > > > > > > > > > > > > > >> > > > > integrity
> > > >> > > > > > > > > > > > > > > > >> > > > > > and should not become a
> > > dumping
> > > >> > > ground
> > > >> > > > > for
> > > >> > > > > > > > > > advanced
> > > >> > > > > > > > > > > > use
> > > >> > > > > > > > > > > > > > > cases.
> > > >> > > > > > > > > > > > > > > > >> I'll
> > > >> > > > > > > > > > > > > > > > >> > > > > update
> > > >> > > > > > > > > > > > > > > > >> > > > > > the KIP with the
> > > `findOffsets`
> > > >> > API
> > > >> > > > > > > suggested
> > > >> > > > > > > > > > above
> > > >> > > > > > > > > > > > and
> > > >> > > > > > > > > > > > > we
> > > >> > > > > > > > > > > > > > > can
> > > >> > > > > > > > > > > > > > > > >> see
> > > >> > > > > > > > > > > > > > > > >> > if
> > > >> > > > > > > > > > > > > > > > >> > > > it
> > > >> > > > > > > > > > > > > > > > >> > > > > > does a good enough job of
> > > >> keeping
> > > >> > > the
> > > >> > > > > API
> > > >> > > > > > > > simple
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > > > > common
> > > >> > > > > > > > > > > > > > > > >> cases.
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > >> > > > > > Jason
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at
> > 4:39
> > > >> AM,
> > > >> > > Dong
> > > >> > > > > Lin
> > > >> > > > > > <
> > > >> > > > > > > > > > > > > > > > lindon...@gmail.com>
> > > >> > > > > > > > > > > > > > > > >> > > wrote:
> > > >> > > > > > > > > > > > > > > > >> > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Hey Jason,
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Regarding seek(...), it
> > > seems
> > > >> > that
> > > >> > > > we
> > > >> > > > > > want
> > > >> > > > > > > > an
> > > >> > > > > > > > > > API
> > > >> > > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > user
> > > >> > > > > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> > > > > initialize
> > > >> > > > > > > > > > > > > > > > >> > > > > > > consumer with (offset,
> > > >> > > leaderEpoch)
> > > >> > > > > and
> > > >> > > > > > > that
> > > >> > > > > > > > > API
> > > >> > > > > > > > > > > > > should
> > > >> > > > > > > > > > > > > > > > allow
> > > >> > > > > > > > > > > > > > > > >> > > > throwing
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > PartitionTruncationException.
> > > >> > > > Suppose
> > > >> > > > > we
> > > >> > > > > > > > agree
> > > >> > > > > > > > > > on
> > > >> > > > > > > > > > > > > this,
> > > >> > > > > > > > > > > > > > > then
> > > >> > > > > > > > > > > > > > > > >> > > > > > > seekToNearest() is not
> > > >> > sufficient
> > > >> > > > > > because
> > > >> > > > > > > it
> > > >> > > > > > > > > > will
> > > >> > > > > > > > > > > > > always
> > > >> > > > > > > > > > > > > > > > >> swallow
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > PartitionTruncationException.
> > > >> > Here
> > > >> > > > we
> > > >> > > > > > have
> > > >> > > > > > > > two
> > > >> > > > > > > > > > > > > options.
> > > >> > > > > > > > > > > > > > > The
> > > >> > > > > > > > > > > > > > > > >> first
> > > >> > > > > > > > > > > > > > > > >> > > > > option
> > > >> > > > > > > > > > > > > > > > >> > > > > > is
> > > >> > > > > > > > > > > > > > > > >> > > > > > > to add API
> > > >> > > offsetsForLeaderEpochs()
> > > >> > > > to
> > > >> > > > > > > > > translate
> > > >> > > > > > > > > > > > > > > > (leaderEpoch,
> > > >> > > > > > > > > > > > > > > > >> > > > offset)
> > > >> > > > > > > > > > > > > > > > >> > > > > to
> > > >> > > > > > > > > > > > > > > > >> > > > > > > offset. The second
> option
> > > is
> > > >> to
> > > >> > > have
> > > >> > > > > add
> > > >> > > > > > > > > > > > seek(offset,
> > > >> > > > > > > > > > > > > > > > >> > leaderEpoch).
> > > >> > > > > > > > > > > > > > > > >> > > > It
> > > >> > > > > > > > > > > > > > > > >> > > > > > > seems that second
> option
> > > may
> > > >> be
> > > >> > > more
> > > >> > > > > > > simpler
> > > >> > > > > > > > > > > because
> > > >> > > > > > > > > > > > > it
> > > >> > > > > > > > > > > > > > > > makes
> > > >> > > > > > > > > > > > > > > > >> it
> > > >> > > > > > > > > > > > > > > > >> > > > clear
> > > >> > > > > > > > > > > > > > > > >> > > > > > that
> > > >> > > > > > > > > > > > > > > > >> > > > > > > (offset, leaderEpoch)
> > will
> > > be
> > > >> > used
> > > >> > > > to
> > > >> > > > > > > > identify
> > > >> > > > > > > > > > > > > > consumer's
> > > >> > > > > > > > > > > > > > > > >> > position
> > > >> > > > > > > > > > > > > > > > >> > > > in a
> > > >> > > > > > > > > > > > > > > > >> > > > > > > partition. And user
> only
> > > >> needs
> > > >> > to
> > > >> > > > > handle
> > > >> > > > > > > > > > > > > > > > >> > > PartitionTruncationException
> > > >> > > > > > > > > > > > > > > > >> > > > > > from
> > > >> > > > > > > > > > > > > > > > >> > > > > > > the poll(). In
> comparison
> > > the
> > > >> > > first
> > > >> > > > > > option
> > > >> > > > > > > > > > seems a
> > > >> > > > > > > > > > > > bit
> > > >> > > > > > > > > > > > > > > > harder
> > > >> > > > > > > > > > > > > > > > >> to
> > > >> > > > > > > > > > > > > > > > >> > > use
> > > >> > > > > > > > > > > > > > > > >> > > > > > > because user have to
> also
> > > >> handle
> > > >> > > the
> > > >> > > > > > > > > > > > > > > > >> PartitionTruncationException
> > > >> > > > > > > > > > > > > > > > >> > > if
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> offsetsForLeaderEpochs()
> > > >> returns
> > > >> > > > > > different
> > > >> > > > > > > > > > offset
> > > >> > > > > > > > > > > > from
> > > >> > > > > > > > > > > > > > > > >> > > user-provided
> > > >> > > > > > > > > > > > > > > > >> > > > > > > offset. What do you
> > think?
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > If we decide to add API
> > > >> > > seek(offset,
> > > >> > > > > > > > > > leaderEpoch),
> > > >> > > > > > > > > > > > > then
> > > >> > > > > > > > > > > > > > we
> > > >> > > > > > > > > > > > > > > > can
> > > >> > > > > > > > > > > > > > > > >> > > decide
> > > >> > > > > > > > > > > > > > > > >> > > > > > > whether and how to add
> > API
> > > to
> > > >> > > > > translate
> > > >> > > > > > > > > (offset,
> > > >> > > > > > > > > > > > > > > > leaderEpoch)
> > > >> > > > > > > > > > > > > > > > >> to
> > > >> > > > > > > > > > > > > > > > >> > > > > offset.
> > > >> > > > > > > > > > > > > > > > >> > > > > > It
> > > >> > > > > > > > > > > > > > > > >> > > > > > > seems that this API
> will
> > be
> > > >> > needed
> > > >> > > > by
> > > >> > > > > > > > advanced
> > > >> > > > > > > > > > > user
> > > >> > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > don't
> > > >> > > > > > > > > > > > > > > > >> want
> > > >> > > > > > > > > > > > > > > > >> > > > auto
> > > >> > > > > > > > > > > > > > > > >> > > > > > > offset reset (so that
> it
> > > can
> > > >> be
> > > >> > > > > > notified)
> > > >> > > > > > > > but
> > > >> > > > > > > > > > > still
> > > >> > > > > > > > > > > > > > wants
> > > >> > > > > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > > >> > reset
> > > >> > > > > > > > > > > > > > > > >> > > > > offset
> > > >> > > > > > > > > > > > > > > > >> > > > > > > to closest. For those
> > users
> > > >> if
> > > >> > > > > probably
> > > >> > > > > > > > makes
> > > >> > > > > > > > > > > sense
> > > >> > > > > > > > > > > > to
> > > >> > > > > > > > > > > > > > > only
> > > >> > > > > > > > > > > > > > > > >> have
> > > >> > > > > > > > > > > > > > > > >> > > the
> > > >> > > > > > > > > > > > > > > > >> > > > > API
> > > >> > > > > > > > > > > > > > > > >> > > > > > in
> > > >> > > > > > > > > > > > > > > > >> > > > > > > AdminClient.
> > > >> offsetsForTimes()
> > > >> > > seems
> > > >> > > > > > like
> > > >> > > > > > > a
> > > >> > > > > > > > > > common
> > > >> > > > > > > > > > > > API
> > > >> > > > > > > > > > > > > > > that
> > > >> > > > > > > > > > > > > > > > >> will
> > > >> > > > > > > > > > > > > > > > >> > be
> > > >> > > > > > > > > > > > > > > > >> > > > > > needed
> > > >> > > > > > > > > > > > > > > > >> > > > > > > by user's of consumer
> in
> > > >> > general,
> > > >> > > so
> > > >> > > > > it
> > > >> > > > > > > may
> > > >> > > > > > > > be
> > > >> > > > > > > > > > > more
> > > >> > > > > > > > > > > > > > > > >> reasonable to
> > > >> > > > > > > > > > > > > > > > >> > > > stay
> > > >> > > > > > > > > > > > > > > > >> > > > > in
> > > >> > > > > > > > > > > > > > > > >> > > > > > > the consumer API. I
> don't
> > > >> have a
> > > >> > > > > strong
> > > >> > > > > > > > > opinion
> > > >> > > > > > > > > > on
> > > >> > > > > > > > > > > > > > whether
> > > >> > > > > > > > > > > > > > > > >> > > > > > > offsetsForTimes()
> should
> > be
> > > >> > > replaced
> > > >> > > > > by
> > > >> > > > > > > API
> > > >> > > > > > > > in
> > > >> > > > > > > > > > > > > > > AdminClient.
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Though (offset,
> > > leaderEpoch)
> > > >> is
> > > >> > > > needed
> > > >> > > > > > to
> > > >> > > > > > > > > > uniquely
> > > >> > > > > > > > > > > > > > > identify
> > > >> > > > > > > > > > > > > > > > a
> > > >> > > > > > > > > > > > > > > > >> > > message
> > > >> > > > > > > > > > > > > > > > >> > > > > in
> > > >> > > > > > > > > > > > > > > > >> > > > > > > general, it is only
> > needed
> > > >> for
> > > >> > > > > advanced
> > > >> > > > > > > > users
> > > >> > > > > > > > > > who
> > > >> > > > > > > > > > > > has
> > > >> > > > > > > > > > > > > > > turned
> > > >> > > > > > > > > > > > > > > > >> on
> > > >> > > > > > > > > > > > > > > > >> > > > unclean
> > > >> > > > > > > > > > > > > > > > >> > > > > > > leader election, need
> to
> > > use
> > > >> > > > seek(..),
> > > >> > > > > > and
> > > >> > > > > > > > > don't
> > > >> > > > > > > > > > > > want
> > > >> > > > > > > > > > > > > > auto
> > > >> > > > > > > > > > > > > > > > >> offset
> > > >> > > > > > > > > > > > > > > > >> > > > > reset.
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Most other users
> probably
> > > >> just
> > > >> > > want
> > > >> > > > to
> > > >> > > > > > > > enable
> > > >> > > > > > > > > > auto
> > > >> > > > > > > > > > > > > > offset
> > > >> > > > > > > > > > > > > > > > >> reset
> > > >> > > > > > > > > > > > > > > > >> > and
> > > >> > > > > > > > > > > > > > > > >> > > > > store
> > > >> > > > > > > > > > > > > > > > >> > > > > > > offset in Kafka. Thus
> we
> > > >> might
> > > >> > > want
> > > >> > > > to
> > > >> > > > > > > keep
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > > existing
> > > >> > > > > > > > > > > > > > > > >> > > offset-only
> > > >> > > > > > > > > > > > > > > > >> > > > > APIs
> > > >> > > > > > > > > > > > > > > > >> > > > > > > (e.g. seek() and
> > > position())
> > > >> for
> > > >> > > > most
> > > >> > > > > > > users
> > > >> > > > > > > > > > while
> > > >> > > > > > > > > > > > > adding
> > > >> > > > > > > > > > > > > > > new
> > > >> > > > > > > > > > > > > > > > >> APIs
> > > >> > > > > > > > > > > > > > > > >> > > for
> > > >> > > > > > > > > > > > > > > > >> > > > > > > advanced users. And
> yes,
> > it
> > > >> > seems
> > > >> > > > that
> > > >> > > > > > we
> > > >> > > > > > > > need
> > > >> > > > > > > > > > new
> > > >> > > > > > > > > > > > > name
> > > >> > > > > > > > > > > > > > > for
> > > >> > > > > > > > > > > > > > > > >> > > > position().
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Though I think we need
> > new
> > > >> APIs
> > > >> > to
> > > >> > > > > carry
> > > >> > > > > > > the
> > > >> > > > > > > > > new
> > > >> > > > > > > > > > > > > > > information
> > > >> > > > > > > > > > > > > > > > >> > (e.g.
> > > >> > > > > > > > > > > > > > > > >> > > > > > > leaderEpoch), I am not
> > very
> > > >> sure
> > > >> > > how
> > > >> > > > > > that
> > > >> > > > > > > > > should
> > > >> > > > > > > > > > > > look
> > > >> > > > > > > > > > > > > > > like.
> > > >> > > > > > > > > > > > > > > > >> One
> > > >> > > > > > > > > > > > > > > > >> > > > > possible
> > > >> > > > > > > > > > > > > > > > >> > > > > > > option is those APIs in
> > > >> KIP-232.
> > > >> > > > > Another
> > > >> > > > > > > > > option
> > > >> > > > > > > > > > is
> > > >> > > > > > > > > > > > > > > something
> > > >> > > > > > > > > > > > > > > > >> like
> > > >> > > > > > > > > > > > > > > > >> > > > this:
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > `````
> > > >> > > > > > > > > > > > > > > > >> > > > > > > class OffsetEpochs {
> > > >> > > > > > > > > > > > > > > > >> > > > > > >   long offset;
> > > >> > > > > > > > > > > > > > > > >> > > > > > >   int leaderEpoch;
> > > >> > > > > > > > > > > > > > > > >> > > > > > >   int partitionEpoch;
> >  //
> > > >> This
> > > >> > > may
> > > >> > > > be
> > > >> > > > > > > > needed
> > > >> > > > > > > > > > > later
> > > >> > > > > > > > > > > > as
> > > >> > > > > > > > > > > > > > > > >> discussed
> > > >> > > > > > > > > > > > > > > > >> > in
> > > >> > > > > > > > > > > > > > > > >> > > > > > KIP-232
> > > >> > > > > > > > > > > > > > > > >> > > > > > >   ... // Hopefully
> these
> > > are
> > > >> all
> > > >> > > we
> > > >> > > > > need
> > > >> > > > > > > to
> > > >> > > > > > > > > > > identify
> > > >> > > > > > > > > > > > > > > message
> > > >> > > > > > > > > > > > > > > > >> in
> > > >> > > > > > > > > > > > > > > > >> > > > Kafka.
> > > >> > > > > > > > > > > > > > > > >> > > > > > But
> > > >> > > > > > > > > > > > > > > > >> > > > > > > if we need more then we
> > can
> > > >> add
> > > >> > > new
> > > >> > > > > > fields
> > > >> > > > > > > > in
> > > >> > > > > > > > > > this
> > > >> > > > > > > > > > > > > > class.
> > > >> > > > > > > > > > > > > > > > >> > > > > > > }
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > OffsetEpochs
> > > >> > > > > > offsetEpochs(TopicPartition);
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > void
> seek(TopicPartition,
> > > >> > > > > OffsetEpochs);
> > > >> > > > > > > > > > > > > > > > >> > > > > > > ``````
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Thanks,
> > > >> > > > > > > > > > > > > > > > >> > > > > > > Dong
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at
> > > 11:13
> > > >> > AM,
> > > >> > > > > Jason
> > > >> > > > > > > > > > Gustafson
> > > >> > > > > > > > > > > <
> > > >> > > > > > > > > > > > > > > > >> > > > ja...@confluent.io>
> > > >> > > > > > > > > > > > > > > > >> > > > > > > wrote:
> > > >> > > > > > > > > > > > > > > > >> > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > > Hey Dong,
> > > >> > > > > > > > > > > > > > > > >> > > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the
> > feedback.
> > > >> The
> > > >> > > first
> > > >> > > > > > three
> > > >> > > > > > > > > > points
> > > >> > > > > > > > > > > > are
> > > >> > > > > > > > > > > > > > > easy:
> > > >> > > > > > > > > > > > > > > > >> > > > > > > >
> > > >> > > > > > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be
> > > >> > consistent.
> > > >> > > > > > > > > > > > > > > > >> > > > > > > > 2. Yes, I will add
> > this.
> > > >> > > > > > > > > > > > > > > > >> > > > > > > > 3. Yes, I think we
> > should
> > > >> > > document
> > > >> > > > > the
> > > >> > > > > > > > > changes
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > > > the
> > > >> > > > > > > > > > > > > > > > >> committed
> > > >> > > > > > > > > > > > > > > > >> > > > > offset
> > > >> > > > > > > > > > > > > > > > >> > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to