William Parker created KAFKA-8674:
-------------------------------------
Summary: Records that exceed maximum size on the Java Producer
don't cause transaction failure
Key: KAFKA-8674
URL: https://issues.apache.org/jira/browse/KAFKA-8674
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 2.1.0
Reporter: William Parker
When using transactions, the [documentation for the Java producer's send
method|https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-]
states that
{code:java}
When used as part of a transaction, it is not necessary to define a callback or
check the result of the future in order to detect errors from send. If any of
the send calls failed with an irrecoverable error, the final
commitTransaction() call will fail and throw the exception from the last failed
send. When this happens, your application should call abortTransaction() to
reset the state and continue to send data.
{code}
However, when the message size is too large, this is not the behavior we have
observed; rather, the commitTransaction call succeeds and the message is not
sent to the broker although other messages succeed. A
KafkaProducer$FutureFailure is returned from the send method, and when the .get
method on this future is called a RecordTooLargeException is thrown, but the
according to the documentation this should not be needed.
I believe this occurs because the doSend method has an [ensureValidRecordSize
call|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L893]
which throws an exception early in its body, before later calls involving the
TransactionManager occur; in essence the fact that this send was attempted is
hidden from the TransactionManager. The logic that actually sends the record(s)
appears to be
[here|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L899],
pasted below, in the following lines:
{code:java}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp,
serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or
getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
{code}
Specifically, as I understand this process, the Sender [retrieves data from the
RecordAccumulator|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L342].
The
[TransactionManager|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java]
maintains state that indicates whether the transaction has failed, and when
the commitTransaction call is made this state is checked. This state appears to
be updated by the Sender, for example in
[failBatch|https://github.com/apache/kafka/blob/2.1.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L686].
However, the crucial point is that when ensureValidRecordSize throws an
exception, this block of interacting code that causes the transaction to fail
(between the RecordAccumulator, the TransactionManager, and the Sender) is
never reached.
It appears to me that Kafka is not behaving as specified by its docs. Are there
thoughts on whether the docs should be changed to reflect this behavior, or if
the ensureValidRecordSize call should be changed to cause the
TransactionManager to fail? Alternatively am I perhaps missing something? Is
this something you'd be interested in taking a patch for?
I found this bug on a cluster running Kafka 2.1.0, but the underlying bug seems
to exist in the doSend method in [version
2.3.0|https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908]
and in the [trunk branch at time of
writing|https://github.com/apache/kafka/blob/d227f940489434c2f23491340d4399d98fd48d2d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L908].
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)