[
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723156#comment-16723156
]
Viktor Somogyi edited comment on KAFKA-7703 at 12/17/18 4:55 PM:
-----------------------------------------------------------------
[~zsxwing], [~dongjoon], I've looked into the issue and it seems there is no
easy fix for this in the code as it designed to be async, so it might take some
time. Even if we make the method atomic the offset reset that arrives later
will be discarded as the first reset nulls out the resetStrategy in
SubscriptionState which triggers the {{else if
(!subscriptions.isOffsetResetNeeded(partition)}}
[check|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L579]
to skip the offset reset.
In the current situation as a workaround you could put a {{position}} call
between {{poll}} and {{seekToEnd}} as it blocks until {{poll}} returns or some
error happens.
was (Author: viktorsomogyi):
[~zsxwing], [~dongjoon], I've looked into the issue and it seems there is no
easy fix for this in the code as it designed to be async, so it might take some
time. Even if we make the method atomic the offset reset that arrives later
will be discarded as the first reset nulls out the resetStrategy in
SubscriptionState which triggers the {{else if
(!subscriptions.isOffsetResetNeeded(partition)}}
[check|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L579]
to skip the offset reset.
In the current situation you could put a {{position}} call between {{poll}} and
{{seekToEnd}} as it blocks until {{poll}} returns or some error happens.
> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> ----------------------------------------------------------------------------
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
> Reporter: Shixiong Zhu
> Assignee: Viktor Somogyi
> Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong
> offset set by another reset request.
> Here is a reproducer:
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background.
> However, after "seekToEnd" is called, due to a race condition in
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen
> between the check
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
> and the seek
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
> "KafkaConsumer.position" may return an "earliest" offset.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)