Hey Anna,
Thanks much for the thoughtful reply. It makes sense to different between
"seeking to a message" and "seeking to a position". I have to questions
here:
- For "seeking to a message" use-case, with the proposed approach user
needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If
message truncation and message append happen immediately after
findOffset(offset,
leaderEpoch) but before seek(offset), it seems that user will seek to the
wrong message without knowing the truncation has happened. Would this be a
problem?
- For "seeking to a position" use-case, it seems that there can be two
positions, i.e. earliest and latest. So these two cases can be
Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it
seems that user will only need to call position() and seek() for "seeking
to a message" use-case?
Thanks,
Dong
On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner wrote:
> Hi Jason and Dong,
>
>
> I’ve been thinking about your suggestions and discussion regarding
> position(), seek(), and new proposed API.
>
>
> Here is my thought process why we should keep position() and seek() API
> unchanged.
>
>
> I think we should separate {offset, leader epoch} that uniquely identifies
> a message from an offset that is a position. In some cases, offsets
> returned from position() could be actual consumed messages by this consumer
> identified by {offset, leader epoch}. In other cases, position() returns
> offset that was not actually consumed. Suppose, the user calls position()
> for the last offset. Suppose we return {offset, leader epoch} of the
> message currently in the log. Then, the message gets truncated before
> consumer’s first poll(). It does not make sense for poll() to fail in this
> case, because the log truncation did not actually happen from the consumer
> perspective. On the other hand, as the KIP proposes, it makes sense for the
> committed() method to return {offset, leader epoch} because those offsets
> represent actual consumed messages.
>
>
> The same argument applies to the seek() method — we are not seeking to a
> message, we are seeking to a position.
>
>
> I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
> something like:
>
> Map findOffsets(Map
> offsetsToSearch)
>
> Similar to seek() and position(), I think findOffsets() should return
> offset without leader epoch, because what we want is the offset that we
> think is closest to the not divergent message from the given consumed
> message. Until the consumer actually fetches the message, we should not let
> the consumer store the leader epoch for a message it did not consume.
>
>
> So, the workflow will be:
>
> 1) The user gets LogTruncationException with {offset, leader epoch of the
> previous message} (whatever we send with new FetchRecords request).
>
> 2) offset = findOffsets(tp -> {offset, leader epoch})
>
> 3) seek(offset)
>
>
> For the use-case where the users store committed offsets externally:
>
> 1) Such users would have to track the leader epoch together with an offset.
> Otherwise, there is no way to detect later what leader epoch was associated
> with the message. I think it’s reasonable to ask that from users if they
> want to detect log truncation. Otherwise, they will get the current
> behavior.
>
>
> If the users currently get an offset to be stored using position(), I see
> two possibilities. First, they call save offset returned from position()
> that they call before poll(). In that case, it would not be correct to
> store {offset, leader epoch} if we would have changed position() to return
> {offset, leader epoch} since actual fetched message could be different
> (from the example I described earlier). So, it would be more correct to
> call position() after poll(). However, the user already gets
> ConsumerRecords at this point, from which the user can extract {offset,
> leader epoch} of the last message.
>
>
> So, I like the idea of adding a helper method to ConsumerRecords, as Jason
> proposed, something like:
>
> public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
> a data struct holding {offset, leader epoch}.
>
>
> In this case, we would advise the user to follow the workflow: poll(), get
> {offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
> save offset and leader epoch, process records.
>
>
> 2) When the user needs to seek to the last committed offset, they call new
> findOffsets(saved offset, leader epoch), and then seek(offset).
>
>
> What do you think?
>
>
> Thanks,
>
> Anna
>
>
> On Tue, Jul 3, 2018 at 4:06 PM Dong Lin wrote:
>
> > Hey Jason,
> >
> > Thanks much for your thoughtful explanation.
> >
> > Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> > advantage of this solution it adds only one API instead of two APIs. The
> > concern is that its usage seems a bit more clumsy for advanced users.
> More
> > specifically, advanced users who store offsets externally will alw