[ 
https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940189#comment-17940189
 ] 

Kirk True commented on KAFKA-19067:
-----------------------------------

I noticed that [some of the seek methods in SubscriptionState are synchronized 
while others are 
not|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L426-L436].

> AsyncKafkaConsumer may return stale fetch result after seek operation
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-19067
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19067
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: PoAn Yang
>            Assignee: PoAn Yang
>            Priority: Major
>              Labels: consumer-threading-refactor
>
> The KafkaConsumer sends FetchRequest after it subscribes topics. The 
> FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, 
> the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET 
> request. The state changes back to FETCHING after the consumer receives 
> LIST_OFFSET response.
> If a KafkaConsumer subscribes topics and calls seek function, there may have 
> stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it 
> gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. 
> If there is stale data in FetchBuffer, the data is ignored because the 
> FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes 
> back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET 
> response.
> However, for AsyncKafkaConsumer, it may return stale FetchResponse data to 
> users, because the ConsumerNetworkThread runs in another thread. The 
> FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does 
> valid position check.
> Following logs show the case for ClassicKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout 
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
> topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, 
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS 
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
> clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout 
> 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
> topics=[ListOffsetsTopic(name='topic', 
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
> timestamp=-2)])], timeoutMs=30000) 
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at 
> offset 0 for partition topic-0 returned fetch data 
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
> lastStableOffset=10, logStartOffset=0, 
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), 
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
> preferredReadReplica=-1, records=MemoryRecords(size=151, 
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The 
> response of FetchRequest which is sent before calling seekToBeginning. The 
> FetchState is AWAIT_RESET, so the data is ignored.
> [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records 
> for partition topic-0 since it no longer has valid position 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:226)
> [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for 
> partition topic-0 to position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The 
> result of seekToBeginning.
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send 
> another FetchRequest starts from offset 0.
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout 
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, 
> sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') 
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}
> Following logs show the case for AsyncKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout 
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
> topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, 
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at 
> offset 0 for partition topic-0 returned fetch data 
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
> lastStableOffset=10, logStartOffset=0, 
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), 
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
> preferredReadReplica=-1, records=MemoryRecords(size=151, 
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data 
> of FetchRequest before calling seekToBeginning, but it has not been used by 
> application thread.
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS 
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
> clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout 
> 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
> topics=[ListOffsetsTopic(name='topic', 
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
> timestamp=-2)])], timeoutMs=30000) 
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for 
> partition topic-0 to position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451)
> ### The stale data before seekToBeginning is used by application thread, so 
> the next FetchRequest starts from offset 10.
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=10, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout 
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, 
> sessionEpoch=1, topics=[FetchTopic(topic='topic', 
> topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, 
> currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, 
> partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], 
> forgottenTopicsData=[], rackId='') 
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to