[ https://issues.apache.org/jira/browse/KAFKA-5385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apurva Mehta updated KAFKA-5385: -------------------------------- Labels: exactly-once (was: ) > Transactional Producer allows batches to expire and commits transactions > regardless > ----------------------------------------------------------------------------------- > > Key: KAFKA-5385 > URL: https://issues.apache.org/jira/browse/KAFKA-5385 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.0 > Reporter: Apurva Mehta > Assignee: Apurva Mehta > Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > The transactions system test has revealed a data loss issue. When there is > cluster instability, it can happen that the transactional requests > (AddPartitions, and AddOffsets) can retry for a long time. When they > eventually succeed, the commit message will be dequeued, at which point we > will try to drain the accumulator. However, we would find the batches should > be expired, and just drop them, but commit the transaction anyway. This > causes data loss. > Relevant portion from the producer log is here: > {noformat} > [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] > Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION > (org.apache.kafka.clients.producer.internals.TransactionManager) > [2017-06-06 01:07:36,275] DEBUG [TransactionalId my-first-transactional-id] > Enqueuing transactional request (type=EndTxnRequest, > transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, > result=COMMIT) > (org.apache.kafka.clients.producer.internals.TransactionManager) > [2017-06-06 01:07:36,276] TRACE Expired 3 batches in accumulator > (org.apache.kafka.clients.producer.internals.RecordAccumulator) > [2017-06-06 01:07:36,286] TRACE Produced messages to topic-partition > output-topic-0 with base offset offset -1 and error: {}. > (org.apache.kafka.clients.producer.internals.ProducerBatch) > org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for > output-topic-0: 39080 ms has passed since batch creation plus linger time > [2017-06-06 01:07:36,424] TRACE Produced messages to topic-partition > output-topic-1 with base offset offset -1 and error: {}. > (org.apache.kafka.clients.producer.internals.ProducerBatch) > org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for > output-topic-1: 39080 ms has passed since batch creation plus linger time > [2017-06-06 01:07:36,436] TRACE Produced messages to topic-partition > output-topic-2 with base offset offset -1 and error: {}. > (org.apache.kafka.clients.producer.internals.ProducerBatch) > org.apache.kafka.common.errors.TimeoutException: Expiring 250 record(s) for > output-topic-2: 39080 ms has passed since batch creation plus linger time > [2017-06-06 01:07:36,444] TRACE [TransactionalId my-first-transactional-id] > Request (type=EndTxnRequest, transactionalId=my-first-transactional-id, > producerId=1001, producerEpoch=0, result=COMMIT) dequeued for sending > (org.apache.kafka.clients.producer.internals.TransactionManager) > [2017-06-06 01:07:36,446] DEBUG [TransactionalId my-first-transactional-id] > Sending transactional request (type=EndTxnRequest, > transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, > result=COMMIT) to node knode04:9092 (id: 3 rack: null) > (org.apache.kafka.clients.producer.internals.Sender) > [2017-06-06 01:07:36,449] TRACE [TransactionalId my-first-transactional-id] > Received transactional response EndTxnResponse(error=NOT_COORDINATOR, > throttleTimeMs=0) for request (type=EndTxnRequest, > transactionalId=my-first-transactional-id, producerId=1001, producerEpoch=0, > result=COMMIT) > (org.apache.kafka.clients.producer.internals.TransactionManager) > {noformat} > As you can see, the commit goes ahead even though the batches are never sent. > In this test, we lost 750 messages in the output topic, and they correspond > exactly with the 750 messages in the input topic at the offset in this > portion of the log. > The solution is to either never expire transactional batches, or fail the > transaction if any batches have expired. -- This message was sent by Atlassian JIRA (v6.3.15#6346)