[ https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940129#comment-17940129 ]
Lianet Magrans commented on KAFKA-19067: ---------------------------------------- Hey [~yangpoan] , thanks for filing! I'm still getting my head around this, but first thoughts if that it does seem to me we could have a tricky situation here, still not sure about the impact though. There is indeed a rather risky way we briefly transition to FETCHING in the background before validating the position: [https://github.com/apache/kafka/blob/1eea7f0528954ce8dcbcc4357ae2ef28c1d1e5f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L1133-L1134] So if the app thread hits the fetching part funnily in-between, it would wrongly take it as a valid position, although it may need validation, ex. here [https://github.com/apache/kafka/blob/cee55dbdeca79cba5cbfca2f53a37344ebe2e38a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L223] Even though that check would be wrong, I expect it wouldn't return buffered data from a different position because of the app thread check for it, right? [https://github.com/apache/kafka/blob/cee55dbdeca79cba5cbfca2f53a37344ebe2e38a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L254] So all the above is the side concerning the buffered data kept in the app thread, and how it could be impacted by the background briefly transitioning to FETCHING before validating. Then there is another side of the story, which is the fetch request generated (and what the logs you shared show). But the fetch requests are always generated in the background, so I would expect we would have no race with this situation right? How exactly would we get to generate a fetch for position 10 (in the background), if the position 10 hasn't been validated yet? This log: {code:java} Added read_uncommitted fetch request for partition topic-0 at position FetchPosition{offset=10...{code} I would expect the partition is not fetchable because it does not have a valid position, so not included in the fetch (here we couldn't be FETCHING briefly since it's all happening in the same thread). Could be missing something, thinking out loud here, let me know your thoughts. Thanks! > AsyncKafkaConsumer may return stale fetch result after seek operation > --------------------------------------------------------------------- > > Key: KAFKA-19067 > URL: https://issues.apache.org/jira/browse/KAFKA-19067 > Project: Kafka > Issue Type: Bug > Reporter: PoAn Yang > Assignee: PoAn Yang > Priority: Major > > The KafkaConsumer sends FetchRequest after it subscribes topics. The > FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, > the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET > request. The state changes back to FETCHING after the consumer receives > LIST_OFFSET response. > If a KafkaConsumer subscribes topics and calls seek function, there may have > stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it > gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. > If there is stale data in FetchBuffer, the data is ignored because the > FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes > back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET > response. > However, for AsyncKafkaConsumer, it may return stale FetchResponse data to > users, because the ConsumerNetworkThread runs in another thread. The > FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does > valid position check. > Following logs show the case for ClassicKafkaConsumer: > {noformat} > [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout > 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, > topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, > partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, > lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], > rackId='') (org.apache.kafka.clients.NetworkClient:604) > ### consumer calls seekToBeginning > [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS > request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, > clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout > 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, > topics=[ListOffsetsTopic(name='topic', > partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, > timestamp=-2)])], timeoutMs=30000) > (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at > offset 0 for partition topic-0 returned fetch data > PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, > lastStableOffset=10, logStartOffset=0, > divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), > currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), > snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, > preferredReadReplica=-1, records=MemoryRecords(size=151, > buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The > response of FetchRequest which is sent before calling seekToBeginning. The > FetchState is AWAIT_RESET, so the data is ignored. > [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records > for partition topic-0 since it no longer has valid position > (org.apache.kafka.clients.consumer.internals.FetchCollector:226) > [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for > partition topic-0 to position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The > result of seekToBeginning. > [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: > null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send > another FetchRequest starts from offset 0. > [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout > 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720, > sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='') > (org.apache.kafka.clients.NetworkClient:604) > {noformat} > Following logs show the case for AsyncKafkaConsumer: > {noformat} > [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout > 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, > topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A, > partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, > lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, > replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], > rackId='') (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at > offset 0 for partition topic-0 returned fetch data > PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, > lastStableOffset=10, logStartOffset=0, > divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), > currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), > snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, > preferredReadReplica=-1, records=MemoryRecords(size=151, > buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154])) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data > of FetchRequest before calling seekToBeginning, but it has not been used by > application thread. > ### consumer calls seekToBeginning > [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS > request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10, > clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout > 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, > topics=[ListOffsetsTopic(name='topic', > partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0, > timestamp=-2)])], timeoutMs=30000) > (org.apache.kafka.clients.NetworkClient:604) > [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for > partition topic-0 to position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}}. > (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) > ### The stale data before seekToBeginning is used by application thread, so > the next FetchRequest starts from offset 10. > [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted > fetch request for partition topic-0 at position FetchPosition{offset=10, > offsetEpoch=Optional[0], > currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack: > null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null > isFenced: false) > (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) > [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request > with header RequestHeader(apiKey=FETCH, apiVersion=17, > clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout > 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, > replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, > minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360, > sessionEpoch=1, topics=[FetchTopic(topic='topic', > topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0, > currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1, > partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], > forgottenTopicsData=[], rackId='') > (org.apache.kafka.clients.NetworkClient:604) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)