Note that Sumant has been working on a KIP proposal to make the producer timeout behaviour more intuitive:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer Ismael On Wed, Dec 7, 2016 at 9:42 AM, Rajini Sivaram <rajinisiva...@googlemail.com > wrote: > If you just want to test retries, you could restart Kafka while the > producer is running and you should see the producer retry while Kafka is > down/leader is being elected after Kafka restarts. If you specifically want > a TimeoutException to trigger all retries, I am not sure how you can. I > would suggest that you raise a JIRA since the current behaviour is not very > intuitive. > > > On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal <mev...@sky.optymyze.com> > wrote: > > > @Asaf > > > > > > > > Do I need to raise new bug for this? > > > > > > > > @Rajini > > > > > > > > Please suggest some the configuration with which retries should work > > according to you. The code is already there in the mail chain. I am > adding > > it here again: > > > > > > > > public void produce(String topicName, String filePath, String > > bootstrapServers, String encoding) { > > > > try (BufferedReader bf = getBufferedReader(filePath, > > encoding); > > > > KafkaProducer<Object, String> producer = > > initKafkaProducer(bootstrapServers)) { > > > > String line; > > > > while ((line = bf.readLine()) != null) { > > > > producer.send(new > > ProducerRecord<>(topicName, line), (metadata, e) -> { > > > > if (e != > > null) { > > > > > > e.printStackTrace(); > > > > } > > > > }); > > > > } > > > > producer.flush(); > > > > } catch (IOException e) { > > > > Throwables.propagate(e); > > > > } > > > > } > > > > > > > > private static KafkaProducer<Object, String> initKafkaProducer(String > > bootstrapServer) { > > > > Properties properties = new Properties(); > > > > properties.put("bootstrap.servers", bootstrapServer); > > > > properties.put("key.serializer", StringSerializer.class. > > getCanonicalName()); > > > > properties.put("value.serializer",StringSerializer. > > class.getCanonicalName()); > > > > properties.put("acks", "-1"); > > > > properties.put("retries", 50000); > > > > properties.put("request.timeout.ms", 1); > > > > return new KafkaProducer<>(properties); > > > > } > > > > > > > > private BufferedReader getBufferedReader(String filePath, String > encoding) > > throws UnsupportedEncodingException, FileNotFoundException { > > > > return new BufferedReader(new InputStreamReader(new > > FileInputStream(filePath), Optional.ofNullable(encoding). > > orElse("UTF-8"))); > > > > } > > > > > > > > Regards, > > > > Vatsal > > > > > > > > -----Original Message----- > > From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com] > > Sent: 06 December 2016 17:27 > > To: users@kafka.apache.org > > Subject: Re: Detecting when all the retries are expired for a message > > > > > > > > I believe batches in RecordAccumulator are expired after > > request.timeout.ms, so they wouldn't get retried in this case. I think > > the config options are quite confusing, making it hard to figure out the > > behavior without looking into the code. > > > > > > > > On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mes...@gmail.com > > <mailto:asaf.mes...@gmail.com>> wrote: > > > > > > > > > Vatsal: > > > > > > > > > > I don't think they merged the fix for this bug (retries doesn't work) > > > > > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547 > > > > > > > > > > > > > > > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal > > > > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> > > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > Bumping up this thread in case anyone of you have any say on this > > issue. > > > > > > > > > > > > Regards, > > > > > > Vatsal > > > > > > > > > > > > -----Original Message----- > > > > > > From: Mevada, Vatsal > > > > > > Sent: 02 December 2016 16:16 > > > > > > To: Kafka Users <users@kafka.apache.org<mailto: > users@kafka.apache.org> > > > > > > > > > Subject: RE: Detecting when all the retries are expired for a > > > > > > message > > > > > > > > > > > > I executed the same producer code for a single record file with > > > > > > following > > > > > > config: > > > > > > > > > > > > properties.put("bootstrap.servers", bootstrapServer); > > > > > > properties.put("key.serializer", > > > > > > StringSerializer.class.getCanonicalName()); > > > > > > properties.put("value.serializer", > > > > > > StringSerializer.class.getCanonicalName()); > > > > > > properties.put("acks", "-1"); > > > > > > properties.put("retries", 50000); > > > > > > properties.put("request.timeout.ms", 1); > > > > > > > > > > > > I have kept request.timeout.ms=1 to make sure that message delivery > > > > > > will fail with TimeoutException. Since the retries are 50000 then > > > > > > the program should take at-least 50000 ms (50 seconds) to complete > for > > single record. > > > > > > However the program is completing almost instantly with only one > > > > > > callback with TimeoutException. I suspect that producer is not going > > > > > > for any retries. Or am I missing something in my code? > > > > > > > > > > > > My Kafka version is 0.10.0.1. > > > > > > > > > > > > Regards, > > > > > > Vatsal > > > > > > Am I missing any configuration or > > > > > > -----Original Message----- > > > > > > From: Ismael Juma [mailto:isma...@gmail.com] > > > > > > Sent: 02 December 2016 13:30 > > > > > > To: Kafka Users <users@kafka.apache.org<mailto: > users@kafka.apache.org> > > > > > > > > > Subject: RE: Detecting when all the retries are expired for a > > > > > > message > > > > > > > > > > > > The callback is called after the retries have been exhausted. > > > > > > > > > > > > Ismael > > > > > > > > > > > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com< > > mailto:mev...@sky.optymyze.com>> wrote: > > > > > > > > > > > > > @Ismael: > > > > > > > > > > > > > > I can handle TimeoutException in the callback. However as per the > > > > > > > documentation of Callback(link: https://kafka.apache.org/0100/ > > > > > > > javadoc/org/apache/kafka/clients/producer/Callback.html), > > > > > > > TimeoutException is a retriable exception and it says that it "may > > > > > > > be covered by increasing #.retries". So even if I get > > > > > > > TimeoutException in callback, wouldn't it try to send message > > > > > > > again until all the retries are done? Would it be safe to assume > > > > > > > that message delivery is failed permanently just by encountering > > TimeoutException in callback? > > > > > > > > > > > > > > Here is a snippet from above mentioned documentation: > > > > > > > "exception - The exception thrown during processing of this record. > > > > > > > Null if no error occurred. Possible thrown exceptions include: > > > > > > > Non-Retriable exceptions (fatal, the message will never be sent): > > > > > > > InvalidTopicException OffsetMetadataTooLargeException > > > > > > > RecordBatchTooLargeException RecordTooLargeException > > > > > > > UnknownServerException Retriable exceptions (transient, may be > > > > > > > covered by increasing #.retries): CorruptRecordException > > > > > > > InvalidMetadataException NotEnoughReplicasAfterAppendException > > > > > > > NotEnoughReplicasException OffsetOutOfRangeException > > > > > > > TimeoutException UnknownTopicOrPartitionException" > > > > > > > > > > > > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not > > > > > > > face the issue that you are mentioning. I mentioned documentation > > > > > > > link of 0.9 by mistake. > > > > > > > > > > > > > > Regards, > > > > > > > Vatsal > > > > > > > -----Original Message----- > > > > > > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com] > > > > > > > Sent: 02 December 2016 00:32 > > > > > > > To: Kafka Users <users@kafka.apache.org<mailto: > > users@kafka.apache.org>> > > > > > > > Subject: Re: Detecting when all the retries are expired for a > > > > > > > message > > > > > > > > > > > > > > There's a critical bug in that section that has only been fixed in > > > > > > > 0.9.0.2 which has not been release yet. Without the fix it doesn't > > > > > > really retry. > > > > > > > I forked the kafka repo, applied the fix, built it and placed it > > > > > > > in our own Nexus Maven repository until 0.9.0.2 will be released. > > > > > > > > > > > > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio > > > > > > > > > > > > > > Feel free to use it. > > > > > > > > > > > > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk > > <mailto:ism...@juma.me.uk>> wrote: > > > > > > > > > > > > > > > The callback should give you what you are asking for. Has it not > > > > > > > > worked as you expect when you tried it? > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal > > > > > > > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am reading a file and dumping each record on Kafka. Here is > > > > > > > > > my producer > > > > > > > > > code: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > public void produce(String topicName, String filePath, String > > > > > > > > > bootstrapServers, String encoding) { > > > > > > > > > > > > > > > > > > try (BufferedReader bf = > > > > > > > > > getBufferedReader(filePath, encoding); > > > > > > > > > > > > > > > > > > KafkaProducer<Object, String> > > > > > > > > > producer = > > > > > > > > > initKafkaProducer(bootstrapServers)) { > > > > > > > > > > > > > > > > > > String line; > > > > > > > > > > > > > > > > > > while ((line = bf.readLine()) > > > > > > > > > != > > > > > > > > > null) { > > > > > > > > > > > > > > > > > > > > > > > > > > > producer.send(new ProducerRecord<>(topicName, line), > > > > > > > > > (metadata, e) -> { > > > > > > > > > > > > > > > > > > > > > > > > > > > if (e != > > > > > > > > > null) { > > > > > > > > > > > > > > > > > > > > > > > > > > > e.printStackTrace(); > > > > > > > > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > }); > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > producer.flush(); > > > > > > > > > > > > > > > > > > } catch (IOException e) { > > > > > > > > > > > > > > > > > > Throwables.propagate(e); > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > private static KafkaProducer<Object, String> > > > > > > > > > initKafkaProducer(String > > > > > > > > > bootstrapServer) { > > > > > > > > > > > > > > > > > > Properties properties = new Properties(); > > > > > > > > > > > > > > > > > > properties.put("bootstrap.servers", > > > > > > > > > bootstrapServer); > > > > > > > > > > > > > > > > > > properties.put("key.serializer", > > > > > > > StringSerializer.class. > > > > > > > > > getCanonicalName()); > > > > > > > > > > > > > > > > > > properties.put("value.serializer", > > > > > > > > StringSerializer.class. > > > > > > > > > getCanonicalName()); > > > > > > > > > > > > > > > > > > properties.put("acks", "-1"); > > > > > > > > > > > > > > > > > > properties.put("retries", 10); > > > > > > > > > > > > > > > > > > return new KafkaProducer<>(properties); > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > private BufferedReader getBufferedReader(String filePath, > > > > > > > > > String > > > > > > > > encoding) > > > > > > > > > throws UnsupportedEncodingException, FileNotFoundException { > > > > > > > > > > > > > > > > > > return new BufferedReader(new > > > > > > > > > InputStreamReader(new FileInputStream(filePath), > > > > > > Optional.ofNullable(encoding). > > > > > > > > > orElse("UTF-8"))); > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As per the official documentation of > > > > > > > > > Callback<https://kafka.apache > > > > > . > > > > > > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm > > > > > > > > > l>, TimeoutException is a retriable exception. As I have kept > > > > > > > > > retries 10, producer will try to resend the message if > > > > > > > > > delivering some message fails with TimeoutException. I am > > > > > > > > > looking for some reliable to way to detect > > > > > > > > when > > > > > > > > > delivery of a message is failed permanently after all retries. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > > > Vatsal > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Regards, > > > > > > > > Rajini > > > > > > -- > Regards, > > Rajini >