[ https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219806#comment-16219806 ]
ASF GitHub Bot commented on KAFKA-6119: --------------------------------------- GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/4137 KAFKA-6119: Bump epoch when expiring transactions in the TransactionCoordinator A description of the problem is in the JIRA. I have added an integration test which reproduces the original scenario, and also added unit test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-6119-bump-epoch-when-expiring-transactions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4137 ---- commit 4405e4f9c30e82864417fdbafe3817ee4acee661 Author: Apurva Mehta <apu...@confluent.io> Date: 2017-10-26T00:58:44Z Bump the epoch when we abort a transaction on the coordinator commit 9945b4f8315dc8c82aaeb003c07458a6231ee96c Author: Apurva Mehta <apu...@confluent.io> Date: 2017-10-26T01:06:04Z Lock the transaction metadata before fencing the epoch. Make the case matching exhaustive ---- > Silent Data Loss in Kafka011 Transactional Producer > --------------------------------------------------- > > Key: KAFKA-6119 > URL: https://issues.apache.org/jira/browse/KAFKA-6119 > Project: Kafka > Issue Type: Bug > Components: core, producer > Affects Versions: 0.11.0.0, 0.11.0.1 > Environment: openjdk version "1.8.0_144" > OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01) > OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode) > Reporter: Gary Y. > Assignee: Apurva Mehta > Priority: Blocker > Labels: reliability > Fix For: 1.0.0, 0.11.0.2 > > > Kafka can lose data published by a transactional {{KafkaProducer}} under some > circumstances, i.e., data that should be committed atomically may not be > fully visible from a consumer with {{read_committed}} isolation level. > > *Steps to reproduce:* > # Set {{transaction.timeout.ms}} to a low value such as {{100}} > # Publish two messages in one transaction to different partitions of a topic > with a sufficiently long time in-between the messages (e.g., 70 s). > # Only the second message is visible with {{read_committed}} isolation level. > See > https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java > for a full example. Detailed instructions can be found in the {{README.md}}: > https://github.com/GJL/kafka011-transactional-producer-bug-demo > *Why is this possible?* > Because the transaction timeout is set to a low value, the transaction will > be rolled back quickly after the first message is sent. Indeed, in the broker > the following logs could be found: > {code} > [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized > transactionalId test-producer-1508964897483 with producerId 5 and producer > epoch 0 on partition __transaction_state-10 > (kafka.coordinator.transaction.TransactionCoordinator) > [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed > rollback ongoing transaction of transactionalId: test-producer-1508964897483 > due to timeout (kafka.coordinator.transaction.TransactionCoordinator) > {code} > After rollback, the second message is sent to a different partition than the > first message. > Upon, transaction commit, > {{org.apache.kafka.clients.producer.internals.TransactionManager}} may > enqueue the request {{addPartitionsToTransactionHandler}}: > {code} > private TransactionalRequestResult > beginCompletingTransaction(TransactionResult transactionResult) { > if (!newPartitionsInTransaction.isEmpty()) > enqueueRequest(addPartitionsToTransactionHandler()); > EndTxnRequest.Builder builder = new > EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, > producerIdAndEpoch.epoch, transactionResult); > EndTxnHandler handler = new EndTxnHandler(builder); > enqueueRequest(handler); > return handler.result; > } > {code} > As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} > is non-empty. I suspect because the second message goes to a different > partition, this condition is satisfied. > In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may > eventually call {{TransactionMetadata#prepareAddPartitions}}: > {code} > def prepareAddPartitions(addedTopicPartitions: > immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { > val newTxnStartTimestamp = state match { > case Empty | CompleteAbort | CompleteCommit => updateTimestamp > case _ => txnStartTimestamp > } > prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, > (topicPartitions ++ addedTopicPartitions).toSet, > newTxnStartTimestamp, updateTimestamp) > } > {code} > Note that the method's first argument {{newState}} of is always *Ongoing* > here. I suspect that this puts the transaction, which should be aborted, to > _Ongoing_ again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)