Hello, Bumping up this thread in case anyone of you have any say on this issue.
Regards, Vatsal -----Original Message----- From: Mevada, Vatsal Sent: 02 December 2016 16:16 To: Kafka Users <users@kafka.apache.org> Subject: RE: Detecting when all the retries are expired for a message I executed the same producer code for a single record file with following config: properties.put("bootstrap.servers", bootstrapServer); properties.put("key.serializer", StringSerializer.class.getCanonicalName()); properties.put("value.serializer", StringSerializer.class.getCanonicalName()); properties.put("acks", "-1"); properties.put("retries", 50000); properties.put("request.timeout.ms", 1); I have kept request.timeout.ms=1 to make sure that message delivery will fail with TimeoutException. Since the retries are 50000 then the program should take at-least 50000 ms (50 seconds) to complete for single record. However the program is completing almost instantly with only one callback with TimeoutException. I suspect that producer is not going for any retries. Or am I missing something in my code? My Kafka version is 0.10.0.1. Regards, Vatsal Am I missing any configuration or -----Original Message----- From: Ismael Juma [mailto:isma...@gmail.com] Sent: 02 December 2016 13:30 To: Kafka Users <users@kafka.apache.org> Subject: RE: Detecting when all the retries are expired for a message The callback is called after the retries have been exhausted. Ismael On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com> wrote: > @Ismael: > > I can handle TimeoutException in the callback. However as per the > documentation of Callback(link: https://kafka.apache.org/0100/ > javadoc/org/apache/kafka/clients/producer/Callback.html), > TimeoutException is a retriable exception and it says that it "may be > covered by increasing #.retries". So even if I get TimeoutException in > callback, wouldn't it try to send message again until all the retries > are done? Would it be safe to assume that message delivery is failed > permanently just by encountering TimeoutException in callback? > > Here is a snippet from above mentioned documentation: > "exception - The exception thrown during processing of this record. > Null if no error occurred. Possible thrown exceptions include: > Non-Retriable exceptions (fatal, the message will never be sent): > InvalidTopicException OffsetMetadataTooLargeException > RecordBatchTooLargeException RecordTooLargeException > UnknownServerException Retriable exceptions (transient, may be covered > by increasing #.retries): CorruptRecordException > InvalidMetadataException NotEnoughReplicasAfterAppendException > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException > UnknownTopicOrPartitionException" > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not > face the issue that you are mentioning. I mentioned documentation link > of 0.9 by mistake. > > Regards, > Vatsal > -----Original Message----- > From: Asaf Mesika [mailto:asaf.mes...@gmail.com] > Sent: 02 December 2016 00:32 > To: Kafka Users <users@kafka.apache.org> > Subject: Re: Detecting when all the retries are expired for a message > > There's a critical bug in that section that has only been fixed in > 0.9.0.2 which has not been release yet. Without the fix it doesn't really > retry. > I forked the kafka repo, applied the fix, built it and placed it in > our own Nexus Maven repository until 0.9.0.2 will be released. > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio > > Feel free to use it. > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote: > > > The callback should give you what you are asking for. Has it not > > worked as you expect when you tried it? > > > > Ismael > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal > > <mev...@sky.optymyze.com> > > wrote: > > > > > Hi, > > > > > > > > > > > > I am reading a file and dumping each record on Kafka. Here is my > > > producer > > > code: > > > > > > > > > > > > public void produce(String topicName, String filePath, String > > > bootstrapServers, String encoding) { > > > > > > try (BufferedReader bf = > > > getBufferedReader(filePath, encoding); > > > > > > KafkaProducer<Object, String> > > > producer = > > > initKafkaProducer(bootstrapServers)) { > > > > > > String line; > > > > > > while ((line = bf.readLine()) != > > > null) { > > > > > > producer.send(new > > > ProducerRecord<>(topicName, line), (metadata, e) -> { > > > > > > if > > > (e != > > > null) { > > > > > > > > > e.printStackTrace(); > > > > > > } > > > > > > }); > > > > > > } > > > > > > producer.flush(); > > > > > > } catch (IOException e) { > > > > > > Throwables.propagate(e); > > > > > > } > > > > > > } > > > > > > > > > > > > private static KafkaProducer<Object, String> > > > initKafkaProducer(String > > > bootstrapServer) { > > > > > > Properties properties = new Properties(); > > > > > > properties.put("bootstrap.servers", > > > bootstrapServer); > > > > > > properties.put("key.serializer", > StringSerializer.class. > > > getCanonicalName()); > > > > > > properties.put("value.serializer", > > StringSerializer.class. > > > getCanonicalName()); > > > > > > properties.put("acks", "-1"); > > > > > > properties.put("retries", 10); > > > > > > return new KafkaProducer<>(properties); > > > > > > } > > > > > > > > > > > > private BufferedReader getBufferedReader(String filePath, String > > encoding) > > > throws UnsupportedEncodingException, FileNotFoundException { > > > > > > return new BufferedReader(new > > > InputStreamReader(new FileInputStream(filePath), > > > Optional.ofNullable(encoding). > > > orElse("UTF-8"))); > > > > > > } > > > > > > > > > > > > As per the official documentation of Callback<https://kafka.apache. > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, > > > TimeoutException is a retriable exception. As I have kept retries > > > 10, producer will try to resend the message if delivering some > > > message fails with TimeoutException. I am looking for some > > > reliable to way to detect > > when > > > delivery of a message is failed permanently after all retries. > > > > > > > > > > > > Regards, > > > > > > Vatsal > > > > > >