Hi;
Im using asynchronous producer to send messages to the broker. If there is
no any error in my callback, i see it takes approximately 0.3 seconds to
produce message. But when I get below error [1], I see it takes 60 seconds
to produce message.
But I do not see any message loss . All messages are available in the
broker.
What causes this error? I see this delay in every 50 message I produced.
How can I increase the performance of the producer when I get this error?
*Code;*
producer.send(new ProducerRecord(topic, this), new ProducerCallback ());
private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception ex)
{
if (ex != null) {
log.error("Error when publishing messages to the topic. Topic
:"+ recordMetadata.topic(),ex);
}
}
}
*Producer properties*
*acks=1*
*linger.ms <http://linger.ms>=10*
*batch.size=51200*
*bootstrap.servers=aukk1.xx.com <http://aukk1.xx.com>\:9092,aukk2.xx.com
<http://aukk2.xx.com>\:9092,aukk3.xx.com <http://aukk3.xx.com>\:9092*
*key.serializer=org.apache.kafka.common.serialization.StringSerializer*
*value.serializer=com.xx.KafkaPayloadSerializer*
[1]
04:51:20,025 ERROR
[org.apache.kafka.clients.producer.internals.RecordBatch]
(kafka-producer-network-thread | producer-673) Error executing
user-provided callback on message for topic-partition RAW_XML1harveyzhu-1::
java.lang.NullPointerException
at
com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at
org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
at java.lang.Thread.run(Thread.java:745)
Thanks.
--
-Ratha
http://vvratha.blogspot.com/