[ 
https://issues.apache.org/jira/browse/KAFKA-5610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095282#comment-16095282
 ] 

ASF GitHub Bot commented on KAFKA-5610:
---------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/kafka/pull/3550


> KafkaApis.handleWriteTxnMarkerRequest can return 
> UNSUPPORTED_FOR_MESSAGE_FORMAT error on partition emigration
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5610
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5610
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>            Priority: Critical
>              Labels: exactly-once
>             Fix For: 0.11.0.1, 0.11.1.0
>
>
> This bug was revealed by the following system test failure 
> http://confluent-systest.s3-website-us-west-2.amazonaws.com/confluent-kafka-system-test-results/?prefix=2017-07-18--001.1500383975--apache--trunk--28c83d9/
> What happened was that a commit marker to the offsets topic (sent as part of 
> the {{producer.sendOffsetsToTransaction}} method) was lost, causing data to 
> be reprocessed, and hence causing the test to fail. 
> The bug is that the wrong error code is returned from the 
> handleWriteTxnMarker request when there is partition emigration. In 
> particular, we have: 
> {code:java}
> for (marker <- markers.asScala) {
>       val producerId = marker.producerId
>       val (goodPartitions, partitionsWithIncorrectMessageFormat) = 
> marker.partitions.asScala.partition { partition =>
>         replicaManager.getMagic(partition) match {
>           case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
>           case _ => false
>         }
>       }
>       if (partitionsWithIncorrectMessageFormat.nonEmpty) {
>         val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
>         partitionsWithIncorrectMessageFormat.foreach { partition => 
> currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
>         updateErrors(producerId, currentErrors)
>       }
> {code}
> But the {{replicaManager.getMagic()}} call will return {{None}} when the 
> partition emigrates, causing the {{handleWriteTxnMarkersRequest}} to return 
> an {{UNSUPPORTED_FOR_MESSAGE_FORMAT}} error. 
> From the log, we see that the partition did emigrate a few milliseconds 
> before the {{WriteTxnMarkerRequest}} was sent.
> On the old leader, worker10:
> {noformat}
> ./worker10/debug/server.log:32245:[2017-07-18 05:43:20,950] INFO 
> [GroupCoordinator 2]: Unloading group metadata for 
> transactions-test-consumer-group with generation 0 
> (kafka.coordinator.group.GroupCoordinator)
> {noformat}
> On the client: 
> {noformat}
> [2017-07-18 05:43:20,959] INFO [Transaction Marker Request Completion Handler 
> 1]: Sending my-first-transactional-id's transaction marker from partition 
> __consumer_offsets-47 has failed with  UNSUPPORTED_FOR_MESSAGE_FORMAT. This 
> partition will be removed from the set of partitions waiting for completion 
> (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
> {noformat}
> As you can see, the client received the response 9 ms after the emigration 
> was initiated on the server.
> Since it is perfectly acceptable for the LeaderISR metadata to be propagated 
> asynchronously, we should have more robust handling of emgiration in 
> KafkaApis so that it returns the right error code when handling a request for 
> a partition for which the broker is no longer the leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to