[ 
https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219548#comment-16219548
 ] 

Apurva Mehta commented on KAFKA-6119:
-------------------------------------

Thanks for the report.

Can you share the actual kafka data logs for the partitions in question? Could 
you also share the TRACE level logging for the producer? 

The producer epoch is bumped on a transaction timeout, so any future messages 
from the producer with the old epoch should result in a 
{{ProducerFencedException}}.  So in your program, the second send should result 
in a {{ProducerFencedException}} and no further operations should be allowed on 
the producer. 

Thanks,
Apurva

> Silent Data Loss in Kafka011 Transactional Producer
> ---------------------------------------------------
>
>                 Key: KAFKA-6119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6119
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.11.0.0, 0.11.0.1
>         Environment: openjdk version "1.8.0_144"
> OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
> OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
>            Reporter: Gary Y.
>            Priority: Blocker
>              Labels: reliability
>
> Kafka can lose data published by a transactional {{KafkaProducer}} under some 
> circumstances, i.e., data that should be committed atomically may not be 
> fully visible from a consumer with {{read_committed}} isolation level.
>  
> *Steps to reproduce:*
> # Set {{transaction.timeout.ms}} to a low value such as {{100}}
> # Publish two messages in one transaction to different partitions of a topic 
> with a sufficiently long time in-between the messages (e.g., 70 s).
> # Only the second message is visible with {{read_committed}} isolation level.
> See 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
>  for a full example. Detailed instructions can be found in the {{README.md}}: 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo
> *Why is this possible?*
> Because the transaction timeout is set to a low value, the transaction will 
> be rolled back quickly after the first message is sent. Indeed, in the broker 
> the following logs could be found:
> {code}
> [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized 
> transactionalId test-producer-1508964897483 with producerId 5 and producer 
> epoch 0 on partition __transaction_state-10 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed 
> rollback ongoing transaction of transactionalId: test-producer-1508964897483 
> due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
> {code}
> After rollback, the second message is sent to a different partition than the 
> first message. 
> Upon, transaction commit, 
> {{org.apache.kafka.clients.producer.internals.TransactionManager}} may 
> enqueue the request {{addPartitionsToTransactionHandler}}:
> {code}
> private TransactionalRequestResult 
> beginCompletingTransaction(TransactionResult transactionResult) {
>         if (!newPartitionsInTransaction.isEmpty())
>             enqueueRequest(addPartitionsToTransactionHandler());
>         EndTxnRequest.Builder builder = new 
> EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
>                 producerIdAndEpoch.epoch, transactionResult);
>         EndTxnHandler handler = new EndTxnHandler(builder);
>         enqueueRequest(handler);
>         return handler.result;
>     }
> {code}
> As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} 
> is non-empty. I suspect because the second message goes to a different 
> partition, this condition is satisfied.
> In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may 
> eventually call {{TransactionMetadata#prepareAddPartitions}}:
> {code}
>  def prepareAddPartitions(addedTopicPartitions: 
> immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
>     val newTxnStartTimestamp = state match {
>       case Empty | CompleteAbort | CompleteCommit => updateTimestamp
>       case _ => txnStartTimestamp
>     }
>     prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, 
> (topicPartitions ++ addedTopicPartitions).toSet,
>       newTxnStartTimestamp, updateTimestamp)
>   }
> {code}
> Note that the method's first argument {{newState}} of is always *Ongoing* 
> here. I suspect that this puts the transaction, which should be aborted, to 
> _Ongoing_ again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to