@Ismael:

I can handle TimeoutException in the callback. However as per the documentation 
of Callback(link: 
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Callback.html),
 TimeoutException is a retriable exception and it says that it "may be covered 
by increasing #.retries". So even if I get TimeoutException in callback, 
wouldn't it try to send message again until all the retries are done? Would it 
be safe to assume that message delivery is failed 
permanently just by encountering TimeoutException in callback?

Here is a snippet from above mentioned documentation: 
"exception - The exception thrown during processing of this record. Null if no 
error occurred. Possible thrown exceptions include: Non-Retriable exceptions 
(fatal, the message will never be sent): InvalidTopicException 
OffsetMetadataTooLargeException RecordBatchTooLargeException 
RecordTooLargeException UnknownServerException Retriable exceptions (transient, 
may be covered by increasing #.retries): CorruptRecordException 
InvalidMetadataException NotEnoughReplicasAfterAppendException 
NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
UnknownTopicOrPartitionException"

@asaf :My kafka - API version is 0.10.0.1. So I think I should not face the 
issue that you are mentioning. I mentioned documentation link of 0.9 by mistake.

Regards,
Vatsal
-----Original Message-----
From: Asaf Mesika [mailto:asaf.mes...@gmail.com] 
Sent: 02 December 2016 00:32
To: Kafka Users <users@kafka.apache.org>
Subject: Re: Detecting when all the retries are expired for a message

There's a critical bug in that section that has only been fixed in 0.9.0.2 
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own 
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote:

> The callback should give you what you are asking for. Has it not 
> worked as you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> <mev...@sky.optymyze.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my 
> > producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String 
> > bootstrapServers, String encoding) {
> >
> >                 try (BufferedReader bf = getBufferedReader(filePath, 
> > encoding);
> >
> >                                 KafkaProducer<Object, String> 
> > producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> >                                 String line;
> >
> >                                 while ((line = bf.readLine()) != 
> > null) {
> >
> >                                                 producer.send(new 
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> >                                                                 if 
> > (e !=
> > null) {
> >
> >
> >       e.printStackTrace();
> >
> >                                                                 }
> >
> >                                                 });
> >
> >                                 }
> >
> >                                 producer.flush();
> >
> >                 } catch (IOException e) {
> >
> >                                 Throwables.propagate(e);
> >
> >                 }
> >
> > }
> >
> >
> >
> > private static KafkaProducer<Object, String> 
> > initKafkaProducer(String
> > bootstrapServer) {
> >
> >                 Properties properties = new Properties();
> >
> >                 properties.put("bootstrap.servers", 
> > bootstrapServer);
> >
> >                 properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("acks", "-1");
> >
> >                 properties.put("retries", 10);
> >
> >                 return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> >                 return new BufferedReader(new InputStreamReader(new 
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback<https://kafka.apache.
> > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 
> > 10, producer will try to resend the message if delivering some 
> > message fails with TimeoutException. I am looking for some reliable 
> > to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>

Reply via email to