[ https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940189#comment-17940189 ]
Kirk True edited comment on KAFKA-19067 at 4/1/25 11:31 PM: ------------------------------------------------------------ I noticed that [some of the seek methods in SubscriptionState are synchronized while others are not|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L426-L436]. If we added {{synchronized}} to {{seekUnvalidated()}}, which is called by the background thread when the user seeks, we keep the {{FetchState}} from existing in that interim state. was (Author: kirktrue): I noticed that [some of the seek methods in SubscriptionState are synchronized while others are not|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L426-L436]. > AsyncKafkaConsumer may return stale fetch result after seek operation > --------------------------------------------------------------------- > > Key: KAFKA-19067 > URL: https://issues.apache.org/jira/browse/KAFKA-19067 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: PoAn Yang > Assignee: PoAn Yang > Priority: Major > Labels: consumer-threading-refactor > > The KafkaConsumer sends FetchRequest after it subscribes topics. The > FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, > the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET > request. The state changes back to FETCHING after the consumer receives > LIST_OFFSET response. > If a KafkaConsumer subscribes topics and calls seek function, there may have > stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it > gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. > If there is stale data in FetchBuffer, the data is ignored because the > FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes > back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET > response. > However, for AsyncKafkaConsumer, it may return stale FetchResponse data to > users, because the ConsumerNetworkThread runs in another thread. The > FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does > valid position check. > Following logs show the case for ClassicKafkaConsumer: > {noformat} > [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout > 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, > topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, > partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, > lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], > rackId='') (org.apache.kafka.clients.NetworkClient:604) > ### consumer calls seekToBeginning > [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS > request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, > clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout > 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, > topics=[ListOffsetsTopic(name='topic', > partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, > timestamp=-2)])], timeoutMs=30000) > (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at > offset 0 for partition topic-0 returned fetch data > PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, > lastStableOffset=10, logStartOffset=0, > divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), > currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), > snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, > preferredReadReplica=-1, records=MemoryRecords(size=151, > buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The > response of FetchRequest which is sent before calling seekToBeginning. The > FetchState is AWAIT_RESET, so the data is ignored. > [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records > for partition topic-0 since it no longer has valid position > (org.apache.kafka.clients.consumer.internals.FetchCollector:226) > [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for > partition topic-0 to position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The > result of seekToBeginning. > [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send > another FetchRequest starts from offset 0. > [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout > 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, > sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') > (org.apache.kafka.clients.NetworkClient:604) > {noformat} > Following logs show the case for AsyncKafkaConsumer: > {noformat} > [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout > 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, > topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, > partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, > lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], > rackId='') (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at > offset 0 for partition topic-0 returned fetch data > PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, > lastStableOffset=10, logStartOffset=0, > divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), > currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), > snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, > preferredReadReplica=-1, records=MemoryRecords(size=151, > buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data > of FetchRequest before calling seekToBeginning, but it has not been used by > application thread. > ### consumer calls seekToBeginning > [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS > request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, > clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout > 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, > topics=[ListOffsetsTopic(name='topic', > partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, > timestamp=-2)])], timeoutMs=30000) > (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for > partition topic-0 to position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) > ### The stale data before seekToBeginning is used by application thread, so > the next FetchRequest starts from offset 10. > [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=10, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout > 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, > sessionEpoch=1, topics=[FetchTopic(topic='topic', > topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, > currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, > partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], > forgottenTopicsData=[], rackId='') > (org.apache.kafka.clients.NetworkClient:604) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)