[
https://issues.apache.org/jira/browse/KAFKA-13694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498364#comment-17498364
]
RivenSun commented on KAFKA-13694:
----------------------------------
Hi [~guozhang]
Thank you for your reply.
The KafkaProducer code I tested locally is as follows:
{code:java}
ProducerRecord<String, String> record = new ProducerRecord<String,
String>("rivenTest4",
System.currentTimeMillis() + value);
Callback callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
LOG.error("the producer has a error:" + e.getMessage());
else {
System.out.println("The offset of the record we just sent is: " +
metadata.offset());
System.out.println("The partition of the record we just sent is: "
+ metadata.partition());
}
}
};
producer.send(record, callback); {code}
The kafkaProducer does not know the specific reason for the failure to send.
producer.log
{code:java}
[kafka-producer-network-thread | producer-1] ERROR
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One or
more records have been rejected {code}
{code:java}
{code}
You can read the source code of the catchException at the end of the
ReplicaManager#appendToLocalLog(...) method
{code:java}
catch {
// NOTE: Failed produce requests metric is not incremented for known
exceptions
// it is supposed to indicate un-expected failures of a broker in handling a
produce request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderOrFollowerException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo,
Some(e)))
case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition,
rve.invalidException)
val recordErrors = rve.recordErrors
(topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
logStartOffset, recordErrors, rve.invalidException.getMessage),
Some(rve.invalidException)))
case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t)
(topicPartition,
LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
Some(t)))
} {code}
In case rve: RecordValidationException, the server only uses
rve.invalidException when printing the log and *filling the exception* field in
LogAppendResult. I understand that rve.recordErrors is indeed used when
constructing the LogAppendInfo field of LogAppendResult, but KafkaClient does
not perceive the recordErrors of LogAppendInfo.
You can refer to the following code
{code:java}
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderOrFollowerException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo,
Some(e))) {code}
In this code, the server does not pass in the recordErrors parameter when
constructing LogAppendInfo.UnknownLogAppendInfo, but the producer can still
know the specific failure reason.
producer2.log
{code:java}
[kafka-producer-network-thread | producer-1] ERROR
us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The
request included a message larger than the max message size the server will
accept. {code}
> Some InvalidRecordException messages are thrown away
> ----------------------------------------------------
>
> Key: KAFKA-13694
> URL: https://issues.apache.org/jira/browse/KAFKA-13694
> Project: Kafka
> Issue Type: Improvement
> Components: clients, core
> Affects Versions: 3.0.0
> Reporter: RivenSun
> Priority: Major
>
> 1.Example
> Topic level config:"cleanup.policy":"compact"
> But when the producer sends the message, the ProducerRecord does not specify
> the key.
>
> producer.log
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:One
> or more records have been rejected {code}
>
>
> server.log
> {code:java}
> [2022-02-25 02:14:54,411] ERROR [ReplicaManager broker=1] Error processing
> append operation on partition rivenTest4-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.InvalidRecordException: One or more records have been
> rejected {code}
> Through the logs of the producer and server, we do not know the reason for
> the failure of sending, only that the message was rejected by the server.
> You can compare the RecordTooLargeException testCase, we can clearly know the
> reason for the failure from the producer, and the server will not print the
> log (the reason will be explained later)
> producer_message_too_large.log :
> {code:java}
> [kafka-producer-network-thread | producer-1] ERROR
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The
> request included a message larger than the max message size the server will
> accept.
> [kafka-producer-network-thread | producer-1] ERROR
> us.zoom.mq.server.adapter.kafka.ProducerTest - the producer has a error:The
> request included a message larger than the max message size the server will
> accept. {code}
> 2.RootCause
> ReplicaManager#appendToLocalLog(...) ->
> Partition#appendRecordsToLeader(...) ->
> UnifiedLog#appendAsLeader(...) -> UnifiedLog#append(...) ->
> LogValidator#validateMessagesAndAssignOffsets(...)
> 1) Analyze the validateMessagesAndAssignOffsets method,
> In the LogValidator#validateRecord method, validateKey and validateTimestamp
> are called, and the error information of all messages is obtained:
> Seq[ApiRecordError];
> In the subsequent processRecordErrors(recordErrors) method, currently only
> special processing is done for Errors.INVALID_TIMESTAMP, because the ERROR
> returned by validateKey is still the ordinary Errors.INVALID_RECORD, so the
> code will run to
> {code:java}
> else {
> throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected"), errors)
> }{code}
> In fact, the *errors* variable here contains the specific information of each
> recordError, but we did not put the errors information into the message of
> InvalidRecordException.
> 2).The exception thrown by processRecordErrors will be caught by
> ReplicaManager#appendToLocalLog(...), we continue to analyze the
> `catchException code` of appendToLocalLog.
> Here, we can know the RecordTooLargeException, why the server does not print
> the log.
> Under case rve: RecordValidationException,
> The server prints the log: processFailedRecord method,
> and sends a response to the client: LogAppendResult method
> In these two methods, we can find that we only use rve.invalidException,
> For rve.recordErrors, the server neither prints it nor returns it to the
> client.
> 3.Solution
> Two solutions, I prefer the second
> 1)Similar to Errors.INVALID_TIMESTAMP, the validateKey method returns
> Errors.INVALID_RECORD_WITHOUT_KEY,
> In the processRecordErrors method, also do special processing for
> Errors.INVALID_RECORD_WITHOUT_KEY
> 2)Modify the logic of the processRecordErrors method, no longer distinguish
> the types of Errors, and {*}Even if new INVALID_RECORD types will be added in
> the future{*}, we uniformly return:
> {code:java}
> throw new RecordValidationException(new InvalidRecordException(
> "One or more records have been rejected due to " + errors.toString()),
> errors) {code}
> Also need to add toString() method for ProduceResponse.RecordError class
> {code:java}
> @Override
> public String toString() {
> return "RecordError("
> + "batchIndex=" + batchIndex
> + ", message=" + ((message == null) ? "null" : "'" + message +
> "'")
> + ")";
> } {code}
> In the past, the toString method of ProduceResponse.PartitionResponse has
> called the toString method of ProduceResponse.RecordError, *but before we
> were missing the RecordError#toString method.*
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)