The callback is called after the retries have been exhausted. Ismael
On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com> wrote: > @Ismael: > > I can handle TimeoutException in the callback. However as per the > documentation of Callback(link: https://kafka.apache.org/0100/ > javadoc/org/apache/kafka/clients/producer/Callback.html), > TimeoutException is a retriable exception and it says that it "may be > covered by increasing #.retries". So even if I get TimeoutException in > callback, wouldn't it try to send message again until all the retries are > done? Would it be safe to assume that message delivery is failed > permanently just by encountering TimeoutException in callback? > > Here is a snippet from above mentioned documentation: > "exception - The exception thrown during processing of this record. Null > if no error occurred. Possible thrown exceptions include: Non-Retriable > exceptions (fatal, the message will never be sent): InvalidTopicException > OffsetMetadataTooLargeException RecordBatchTooLargeException > RecordTooLargeException UnknownServerException Retriable exceptions > (transient, may be covered by increasing #.retries): CorruptRecordException > InvalidMetadataException NotEnoughReplicasAfterAppendException > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException > UnknownTopicOrPartitionException" > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not face > the issue that you are mentioning. I mentioned documentation link of 0.9 by > mistake. > > Regards, > Vatsal > -----Original Message----- > From: Asaf Mesika [mailto:asaf.mes...@gmail.com] > Sent: 02 December 2016 00:32 > To: Kafka Users <users@kafka.apache.org> > Subject: Re: Detecting when all the retries are expired for a message > > There's a critical bug in that section that has only been fixed in 0.9.0.2 > which has not been release yet. Without the fix it doesn't really retry. > I forked the kafka repo, applied the fix, built it and placed it in our > own Nexus Maven repository until 0.9.0.2 will be released. > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio > > Feel free to use it. > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote: > > > The callback should give you what you are asking for. Has it not > > worked as you expect when you tried it? > > > > Ismael > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal > > <mev...@sky.optymyze.com> > > wrote: > > > > > Hi, > > > > > > > > > > > > I am reading a file and dumping each record on Kafka. Here is my > > > producer > > > code: > > > > > > > > > > > > public void produce(String topicName, String filePath, String > > > bootstrapServers, String encoding) { > > > > > > try (BufferedReader bf = getBufferedReader(filePath, > > > encoding); > > > > > > KafkaProducer<Object, String> > > > producer = > > > initKafkaProducer(bootstrapServers)) { > > > > > > String line; > > > > > > while ((line = bf.readLine()) != > > > null) { > > > > > > producer.send(new > > > ProducerRecord<>(topicName, line), (metadata, e) -> { > > > > > > if > > > (e != > > > null) { > > > > > > > > > e.printStackTrace(); > > > > > > } > > > > > > }); > > > > > > } > > > > > > producer.flush(); > > > > > > } catch (IOException e) { > > > > > > Throwables.propagate(e); > > > > > > } > > > > > > } > > > > > > > > > > > > private static KafkaProducer<Object, String> > > > initKafkaProducer(String > > > bootstrapServer) { > > > > > > Properties properties = new Properties(); > > > > > > properties.put("bootstrap.servers", > > > bootstrapServer); > > > > > > properties.put("key.serializer", > StringSerializer.class. > > > getCanonicalName()); > > > > > > properties.put("value.serializer", > > StringSerializer.class. > > > getCanonicalName()); > > > > > > properties.put("acks", "-1"); > > > > > > properties.put("retries", 10); > > > > > > return new KafkaProducer<>(properties); > > > > > > } > > > > > > > > > > > > private BufferedReader getBufferedReader(String filePath, String > > encoding) > > > throws UnsupportedEncodingException, FileNotFoundException { > > > > > > return new BufferedReader(new InputStreamReader(new > > > FileInputStream(filePath), Optional.ofNullable(encoding). > > > orElse("UTF-8"))); > > > > > > } > > > > > > > > > > > > As per the official documentation of Callback<https://kafka.apache. > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, > > > TimeoutException is a retriable exception. As I have kept retries > > > 10, producer will try to resend the message if delivering some > > > message fails with TimeoutException. I am looking for some reliable > > > to way to detect > > when > > > delivery of a message is failed permanently after all retries. > > > > > > > > > > > > Regards, > > > > > > Vatsal > > > > > >