[ https://issues.apache.org/jira/browse/KAFKA-17139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874143#comment-17874143 ]
Yu Wang commented on KAFKA-17139: --------------------------------- [~ChrisEgerton] could you help to review this issue? > MirrorSourceTask will stop mirroring when get BufferUnderflowException > ---------------------------------------------------------------------- > > Key: KAFKA-17139 > URL: https://issues.apache.org/jira/browse/KAFKA-17139 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker > Affects Versions: 3.0.0, 3.5.2, 3.6.2, 3.7.1 > Reporter: Yu Wang > Assignee: Yu Wang > Priority: Major > Attachments: image-2024-07-15-15-35-12-489.png > > > Recently we found the data mirroring of one of our partition stopped after > got the following exception > {code:java} > [2024-07-05 13:36:07,058] WARN Failure during poll. > (org.apache.kafka.connect.mirror.MirrorSourceTask)org.apache.kafka.common.protocol.types.SchemaException: > Buffer underflow while parsing response for request with header > RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-null-8, > correlationId=-855959214) at > org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:722) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:865) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:141) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750)Caused by: > java.nio.BufferUnderflowException at > java.nio.Buffer.nextGetIndex(Buffer.java:510) at > java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427) at > org.apache.kafka.common.protocol.ByteBufferAccessor.readLong(ByteBufferAccessor.java:48) > at > org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.read(FetchResponseData.java:1928) > at > org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.<init>(FetchResponseData.java:1904) > at > org.apache.kafka.common.message.FetchResponseData$PartitionData.read(FetchResponseData.java:881) > at > org.apache.kafka.common.message.FetchResponseData$PartitionData.<init>(FetchResponseData.java:805) > at > org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.read(FetchResponseData.java:524) > at > org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.<init>(FetchResponseData.java:464) > at > org.apache.kafka.common.message.FetchResponseData.read(FetchResponseData.java:199) > at > org.apache.kafka.common.message.FetchResponseData.<init>(FetchResponseData.java:136) > at > org.apache.kafka.common.requests.FetchResponse.parse(FetchResponse.java:119) > at > org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:117) > at > org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:720) > ... 17 more {code} > The exception only thrown once, then the consumer stopped to fetrch from the > node, the request rate to one of the Kafka broker dropped to 0 > !image-2024-07-15-15-35-12-489.png|width=601,height=233! > > After going through the code of KafkaConsumer, every time KafkaConsumer tries > to generate the fetch request to Kafka brokers, it will check if the target > broker exists in {*}nodesWithPendingFetchRequests{*}. If it exists, then skip > the target kafka broker in this round. > [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L433] > The broker id can be removed only when the response completed. > [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L600] > But in this case, the exception was thrown at *handleCompletedReceives,* > which means the node id will never be removed from the > *nodesWithPendingFetchRequests.* > [https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L594] > > {code:java} > handleCompletedSends(responses, updatedNow); > handleCompletedReceives(responses, updatedNow); > handleDisconnections(responses, updatedNow); > handleConnections(); > handleInitiateApiVersionRequests(updatedNow); > handleTimedOutConnections(responses, updatedNow); > handleTimedOutRequests(responses, updatedNow); > completeResponses(responses); {code} > So every time, when KafkaConnect source task try to call the *poll* method > of the MirrorSourceTask, the KafkaConsumer will skip fetch from the node id > that left in the *nodesWithPendingFetchRequests.* > This will make the MirrorMaker tasks stop the data mirroring with only 1 WARN > log. -- This message was sent by Atlassian Jira (v8.20.10#820010)