[ https://issues.apache.org/jira/browse/KAFKA-5610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095282#comment-16095282 ]
ASF GitHub Bot commented on KAFKA-5610: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3550 > KafkaApis.handleWriteTxnMarkerRequest can return > UNSUPPORTED_FOR_MESSAGE_FORMAT error on partition emigration > ------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-5610 > URL: https://issues.apache.org/jira/browse/KAFKA-5610 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.0 > Reporter: Apurva Mehta > Assignee: Apurva Mehta > Priority: Critical > Labels: exactly-once > Fix For: 0.11.0.1, 0.11.1.0 > > > This bug was revealed by the following system test failure > http://confluent-systest.s3-website-us-west-2.amazonaws.com/confluent-kafka-system-test-results/?prefix=2017-07-18--001.1500383975--apache--trunk--28c83d9/ > What happened was that a commit marker to the offsets topic (sent as part of > the {{producer.sendOffsetsToTransaction}} method) was lost, causing data to > be reprocessed, and hence causing the test to fail. > The bug is that the wrong error code is returned from the > handleWriteTxnMarker request when there is partition emigration. In > particular, we have: > {code:java} > for (marker <- markers.asScala) { > val producerId = marker.producerId > val (goodPartitions, partitionsWithIncorrectMessageFormat) = > marker.partitions.asScala.partition { partition => > replicaManager.getMagic(partition) match { > case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true > case _ => false > } > } > if (partitionsWithIncorrectMessageFormat.nonEmpty) { > val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]() > partitionsWithIncorrectMessageFormat.foreach { partition => > currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) } > updateErrors(producerId, currentErrors) > } > {code} > But the {{replicaManager.getMagic()}} call will return {{None}} when the > partition emigrates, causing the {{handleWriteTxnMarkersRequest}} to return > an {{UNSUPPORTED_FOR_MESSAGE_FORMAT}} error. > From the log, we see that the partition did emigrate a few milliseconds > before the {{WriteTxnMarkerRequest}} was sent. > On the old leader, worker10: > {noformat} > ./worker10/debug/server.log:32245:[2017-07-18 05:43:20,950] INFO > [GroupCoordinator 2]: Unloading group metadata for > transactions-test-consumer-group with generation 0 > (kafka.coordinator.group.GroupCoordinator) > {noformat} > On the client: > {noformat} > [2017-07-18 05:43:20,959] INFO [Transaction Marker Request Completion Handler > 1]: Sending my-first-transactional-id's transaction marker from partition > __consumer_offsets-47 has failed with UNSUPPORTED_FOR_MESSAGE_FORMAT. This > partition will be removed from the set of partitions waiting for completion > (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler) > {noformat} > As you can see, the client received the response 9 ms after the emigration > was initiated on the server. > Since it is perfectly acceptable for the LeaderISR metadata to be propagated > asynchronously, we should have more robust handling of emgiration in > KafkaApis so that it returns the right error code when handling a request for > a partition for which the broker is no longer the leader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)