[ 
https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940405#comment-17940405
 ] 

PoAn Yang commented on KAFKA-19067:
-----------------------------------

[~chia7712] Thanks for the comment. Like we discussed offline, the fetch buffer 
data starts from 0 and seekToBeginning makes subscription state position is 
also 0, so it passes fetch position check. Since the data of specific offset is 
immutable (assume the consumer uses read committed), the case doesn't really 
affect production behavior. I will close the issue.

> AsyncKafkaConsumer may return stale fetch result after seek operation
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-19067
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19067
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: PoAn Yang
>            Assignee: PoAn Yang
>            Priority: Major
>              Labels: consumer-threading-refactor
>
> The KafkaConsumer sends FetchRequest after it subscribes topics. The 
> FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, 
> the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET 
> request. The state changes back to FETCHING after the consumer receives 
> LIST_OFFSET response.
> If a KafkaConsumer subscribes topics and calls seek function, there may have 
> stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it 
> gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. 
> If there is stale data in FetchBuffer, the data is ignored because the 
> FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes 
> back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET 
> response.
> However, for AsyncKafkaConsumer, it may return stale FetchResponse data to 
> users, because the ConsumerNetworkThread runs in another thread. The 
> FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does 
> valid position check.
> Following logs show the case for ClassicKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout 
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
> topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, 
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS 
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
> clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout 
> 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
> topics=[ListOffsetsTopic(name='topic', 
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
> timestamp=-2)])], timeoutMs=30000) 
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at 
> offset 0 for partition topic-0 returned fetch data 
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
> lastStableOffset=10, logStartOffset=0, 
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), 
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
> preferredReadReplica=-1, records=MemoryRecords(size=151, 
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The 
> response of FetchRequest which is sent before calling seekToBeginning. The 
> FetchState is AWAIT_RESET, so the data is ignored.
> [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records 
> for partition topic-0 since it no longer has valid position 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:226)
> [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for 
> partition topic-0 to position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The 
> result of seekToBeginning.
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send 
> another FetchRequest starts from offset 0.
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout 
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, 
> sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') 
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}
> Following logs show the case for AsyncKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout 
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
> topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, 
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, 
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], 
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at 
> offset 0 for partition topic-0 returned fetch data 
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, 
> lastStableOffset=10, logStartOffset=0, 
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), 
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), 
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, 
> preferredReadReplica=-1, records=MemoryRecords(size=151, 
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data 
> of FetchRequest before calling seekToBeginning, but it has not been used by 
> application thread.
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS 
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, 
> clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout 
> 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, 
> topics=[ListOffsetsTopic(name='topic', 
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, 
> timestamp=-2)])], timeoutMs=30000) 
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for 
> partition topic-0 to position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}}. 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451)
> ### The stale data before seekToBeginning is used by application thread, so 
> the next FetchRequest starts from offset 10.
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=10, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout 
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, 
> sessionEpoch=1, topics=[FetchTopic(topic='topic', 
> topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, 
> currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, 
> partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], 
> forgottenTopicsData=[], rackId='') 
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to