[ 
https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219620#comment-16219620
 ] 

Ted Yu edited comment on KAFKA-6119 at 10/25/17 10:08 PM:
----------------------------------------------------------

Looks like the new state shouldn't be hardcoded.
How about the following change ?
{code}
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 486a887..6c00149 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -208,11 +208,12 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,

   def prepareAddPartitions(addedTopicPartitions: 
immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
     val newTxnStartTimestamp = state match {
-      case Empty | CompleteAbort | CompleteCommit => updateTimestamp
+      case Empty | CompleteAbort | PrepareAbort | CompleteCommit => 
updateTimestamp
       case _ => txnStartTimestamp
     }
+    val newState = if (state == CompleteAbort | state == PrepareAbort) state 
else Ongoing

-    prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, 
(topicPartitions ++ addedTopicPartitions).toSet,
+    prepareTransitionTo(newState, producerId, producerEpoch, txnTimeoutMs, 
(topicPartitions ++ addedTopicPartitions).toSet,
       newTxnStartTimestamp, updateTimestamp)
   }
{code}


was (Author: yuzhih...@gmail.com):
Looks like the new state shouldn't be hardcoded.
How about the following change ?
{code}
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 486a887..6c00149 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -208,11 +208,12 @@ private[transaction] class TransactionMetadata(val 
transactionalId: String,

   def prepareAddPartitions(addedTopicPartitions: 
immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
     val newTxnStartTimestamp = state match {
-      case Empty | CompleteAbort | CompleteCommit => updateTimestamp
+      case Empty | CompleteAbort | PrepareAbort | CompleteCommit => 
updateTimestamp
       case _ => txnStartTimestamp
     }
+    val newState = if (state == CompleteAbort | state == PrepareAbort) 
CompleteAbort else Ongoing

-    prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, 
(topicPartitions ++ addedTopicPartitions).toSet,
+    prepareTransitionTo(newState, producerId, producerEpoch, txnTimeoutMs, 
(topicPartitions ++ addedTopicPartitions).toSet,
       newTxnStartTimestamp, updateTimestamp)
   }
{code}

> Silent Data Loss in Kafka011 Transactional Producer
> ---------------------------------------------------
>
>                 Key: KAFKA-6119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6119
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.11.0.0, 0.11.0.1
>         Environment: openjdk version "1.8.0_144"
> OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01)
> OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode)
>            Reporter: Gary Y.
>            Priority: Blocker
>              Labels: reliability
>
> Kafka can lose data published by a transactional {{KafkaProducer}} under some 
> circumstances, i.e., data that should be committed atomically may not be 
> fully visible from a consumer with {{read_committed}} isolation level.
>  
> *Steps to reproduce:*
> # Set {{transaction.timeout.ms}} to a low value such as {{100}}
> # Publish two messages in one transaction to different partitions of a topic 
> with a sufficiently long time in-between the messages (e.g., 70 s).
> # Only the second message is visible with {{read_committed}} isolation level.
> See 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java
>  for a full example. Detailed instructions can be found in the {{README.md}}: 
> https://github.com/GJL/kafka011-transactional-producer-bug-demo
> *Why is this possible?*
> Because the transaction timeout is set to a low value, the transaction will 
> be rolled back quickly after the first message is sent. Indeed, in the broker 
> the following logs could be found:
> {code}
> [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized 
> transactionalId test-producer-1508964897483 with producerId 5 and producer 
> epoch 0 on partition __transaction_state-10 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed 
> rollback ongoing transaction of transactionalId: test-producer-1508964897483 
> due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
> {code}
> After rollback, the second message is sent to a different partition than the 
> first message. 
> Upon, transaction commit, 
> {{org.apache.kafka.clients.producer.internals.TransactionManager}} may 
> enqueue the request {{addPartitionsToTransactionHandler}}:
> {code}
> private TransactionalRequestResult 
> beginCompletingTransaction(TransactionResult transactionResult) {
>         if (!newPartitionsInTransaction.isEmpty())
>             enqueueRequest(addPartitionsToTransactionHandler());
>         EndTxnRequest.Builder builder = new 
> EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
>                 producerIdAndEpoch.epoch, transactionResult);
>         EndTxnHandler handler = new EndTxnHandler(builder);
>         enqueueRequest(handler);
>         return handler.result;
>     }
> {code}
> As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} 
> is non-empty. I suspect because the second message goes to a different 
> partition, this condition is satisfied.
> In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may 
> eventually call {{TransactionMetadata#prepareAddPartitions}}:
> {code}
>  def prepareAddPartitions(addedTopicPartitions: 
> immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = {
>     val newTxnStartTimestamp = state match {
>       case Empty | CompleteAbort | CompleteCommit => updateTimestamp
>       case _ => txnStartTimestamp
>     }
>     prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, 
> (topicPartitions ++ addedTopicPartitions).toSet,
>       newTxnStartTimestamp, updateTimestamp)
>   }
> {code}
> Note that the method's first argument {{newState}} of is always *Ongoing* 
> here. I suspect that this puts the transaction, which should be aborted, to 
> _Ongoing_ again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to