If you just want to test retries, you could restart Kafka while the
producer is running and you should see the producer retry while Kafka is
down/leader is being elected after Kafka restarts. If you specifically want
a TimeoutException to trigger all retries, I am not sure how you can. I
would suggest that you raise a JIRA since the current behaviour is not very
intuitive.


On Wed, Dec 7, 2016 at 6:51 AM, Mevada, Vatsal <mev...@sky.optymyze.com>
wrote:

> @Asaf
>
>
>
> Do I need to raise new bug for this?
>
>
>
> @Rajini
>
>
>
> Please suggest some the configuration with which retries should work
> according to you. The code is already there in the mail chain. I am adding
> it here again:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
>                 try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
>                                 KafkaProducer<Object, String> producer =
> initKafkaProducer(bootstrapServers)) {
>
>                                 String line;
>
>                                 while ((line = bf.readLine()) != null) {
>
>                                                 producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
>                                                                 if (e !=
> null) {
>
>
>       e.printStackTrace();
>
>                                                                 }
>
>                                                 });
>
>                                 }
>
>                                 producer.flush();
>
>                 } catch (IOException e) {
>
>                                 Throwables.propagate(e);
>
>                 }
>
> }
>
>
>
> private static KafkaProducer<Object, String> initKafkaProducer(String
> bootstrapServer) {
>
>                 Properties properties = new Properties();
>
>                 properties.put("bootstrap.servers", bootstrapServer);
>
>                 properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("value.serializer",StringSerializer.
> class.getCanonicalName());
>
>                 properties.put("acks", "-1");
>
>                 properties.put("retries", 50000);
>
>                 properties.put("request.timeout.ms", 1);
>
>                 return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
>                 return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> Regards,
>
> Vatsal
>
>
>
> -----Original Message-----
> From: Rajini Sivaram [mailto:rajinisiva...@googlemail.com]
> Sent: 06 December 2016 17:27
> To: users@kafka.apache.org
> Subject: Re: Detecting when all the retries are expired for a message
>
>
>
> I believe batches in RecordAccumulator are expired after
> request.timeout.ms, so they wouldn't get retried in this case. I think
> the config options are quite confusing, making it hard to figure out the
> behavior without looking into the code.
>
>
>
> On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika <asaf.mes...@gmail.com
> <mailto:asaf.mes...@gmail.com>> wrote:
>
>
>
> > Vatsal:
>
> >
>
> > I don't think they merged the fix for this bug (retries doesn't work)
>
> > in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
> >
>
> >
>
> > On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
>
> > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>>
>
> > wrote:
>
> >
>
> > > Hello,
>
> > >
>
> > > Bumping up this thread in case anyone of you have any say on this
> issue.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > >
>
> > > -----Original Message-----
>
> > > From: Mevada, Vatsal
>
> > > Sent: 02 December 2016 16:16
>
> > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > I executed the same producer code for a single record file with
>
> > > following
>
> > > config:
>
> > >
>
> > >         properties.put("bootstrap.servers", bootstrapServer);
>
> > >         properties.put("key.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > >         properties.put("value.serializer",
>
> > > StringSerializer.class.getCanonicalName());
>
> > >         properties.put("acks", "-1");
>
> > >         properties.put("retries", 50000);
>
> > >         properties.put("request.timeout.ms", 1);
>
> > >
>
> > > I have kept request.timeout.ms=1 to make sure that message delivery
>
> > > will fail with TimeoutException. Since the retries are 50000 then
>
> > > the program should take at-least 50000 ms (50 seconds) to complete for
> single record.
>
> > > However the program is completing almost instantly with only one
>
> > > callback with TimeoutException. I suspect that producer is not going
>
> > > for any retries. Or am I missing something in my code?
>
> > >
>
> > > My Kafka version is 0.10.0.1.
>
> > >
>
> > > Regards,
>
> > > Vatsal
>
> > > Am I missing any configuration or
>
> > > -----Original Message-----
>
> > > From: Ismael Juma [mailto:isma...@gmail.com]
>
> > > Sent: 02 December 2016 13:30
>
> > > To: Kafka Users <users@kafka.apache.org<mailto:users@kafka.apache.org>
> >
>
> > > Subject: RE: Detecting when all the retries are expired for a
>
> > > message
>
> > >
>
> > > The callback is called after the retries have been exhausted.
>
> > >
>
> > > Ismael
>
> > >
>
> > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com<
> mailto:mev...@sky.optymyze.com>> wrote:
>
> > >
>
> > > > @Ismael:
>
> > > >
>
> > > > I can handle TimeoutException in the callback. However as per the
>
> > > > documentation of Callback(link: https://kafka.apache.org/0100/
>
> > > > javadoc/org/apache/kafka/clients/producer/Callback.html),
>
> > > > TimeoutException is a retriable exception and it says that it "may
>
> > > > be covered by increasing #.retries". So even if I get
>
> > > > TimeoutException in callback, wouldn't it try to send message
>
> > > > again until all the retries are done? Would it be safe to assume
>
> > > > that message delivery is failed permanently just by encountering
> TimeoutException in callback?
>
> > > >
>
> > > > Here is a snippet from above mentioned documentation:
>
> > > > "exception - The exception thrown during processing of this record.
>
> > > > Null if no error occurred. Possible thrown exceptions include:
>
> > > > Non-Retriable exceptions (fatal, the message will never be sent):
>
> > > > InvalidTopicException OffsetMetadataTooLargeException
>
> > > > RecordBatchTooLargeException RecordTooLargeException
>
> > > > UnknownServerException Retriable exceptions (transient, may be
>
> > > > covered by increasing #.retries): CorruptRecordException
>
> > > > InvalidMetadataException NotEnoughReplicasAfterAppendException
>
> > > > NotEnoughReplicasException OffsetOutOfRangeException
>
> > > > TimeoutException UnknownTopicOrPartitionException"
>
> > > >
>
> > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
>
> > > > face the issue that you are mentioning. I mentioned documentation
>
> > > > link of 0.9 by mistake.
>
> > > >
>
> > > > Regards,
>
> > > > Vatsal
>
> > > > -----Original Message-----
>
> > > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com]
>
> > > > Sent: 02 December 2016 00:32
>
> > > > To: Kafka Users <users@kafka.apache.org<mailto:
> users@kafka.apache.org>>
>
> > > > Subject: Re: Detecting when all the retries are expired for a
>
> > > > message
>
> > > >
>
> > > > There's a critical bug in that section that has only been fixed in
>
> > > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
>
> > > really retry.
>
> > > > I forked the kafka repo, applied the fix, built it and placed it
>
> > > > in our own Nexus Maven repository until 0.9.0.2 will be released.
>
> > > >
>
> > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
>
> > > >
>
> > > > Feel free to use it.
>
> > > >
>
> > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk
> <mailto:ism...@juma.me.uk>> wrote:
>
> > > >
>
> > > > > The callback should give you what you are asking for. Has it not
>
> > > > > worked as you expect when you tried it?
>
> > > > >
>
> > > > > Ismael
>
> > > > >
>
> > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
>
> > > > > <mev...@sky.optymyze.com<mailto:mev...@sky.optymyze.com>>
>
> > > > > wrote:
>
> > > > >
>
> > > > > > Hi,
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > I am reading a file and dumping each record on Kafka. Here is
>
> > > > > > my producer
>
> > > > > > code:
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > public void produce(String topicName, String filePath, String
>
> > > > > > bootstrapServers, String encoding) {
>
> > > > > >
>
> > > > > >                 try (BufferedReader bf =
>
> > > > > > getBufferedReader(filePath, encoding);
>
> > > > > >
>
> > > > > >                                 KafkaProducer<Object, String>
>
> > > > > > producer =
>
> > > > > > initKafkaProducer(bootstrapServers)) {
>
> > > > > >
>
> > > > > >                                 String line;
>
> > > > > >
>
> > > > > >                                 while ((line = bf.readLine())
>
> > > > > > !=
>
> > > > > > null) {
>
> > > > > >
>
> > > > > >
>
> > > > > > producer.send(new ProducerRecord<>(topicName, line),
>
> > > > > > (metadata, e) -> {
>
> > > > > >
>
> > > > > >
>
> > > > > > if (e !=
>
> > > > > > null) {
>
> > > > > >
>
> > > > > >
>
> > > > > >       e.printStackTrace();
>
> > > > > >
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >                                                 });
>
> > > > > >
>
> > > > > >                                 }
>
> > > > > >
>
> > > > > >                                 producer.flush();
>
> > > > > >
>
> > > > > >                 } catch (IOException e) {
>
> > > > > >
>
> > > > > >                                 Throwables.propagate(e);
>
> > > > > >
>
> > > > > >                 }
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > private static KafkaProducer<Object, String>
>
> > > > > > initKafkaProducer(String
>
> > > > > > bootstrapServer) {
>
> > > > > >
>
> > > > > >                 Properties properties = new Properties();
>
> > > > > >
>
> > > > > >                 properties.put("bootstrap.servers",
>
> > > > > > bootstrapServer);
>
> > > > > >
>
> > > > > >                 properties.put("key.serializer",
>
> > > > StringSerializer.class.
>
> > > > > > getCanonicalName());
>
> > > > > >
>
> > > > > >                 properties.put("value.serializer",
>
> > > > > StringSerializer.class.
>
> > > > > > getCanonicalName());
>
> > > > > >
>
> > > > > >                 properties.put("acks", "-1");
>
> > > > > >
>
> > > > > >                 properties.put("retries", 10);
>
> > > > > >
>
> > > > > >                 return new KafkaProducer<>(properties);
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > private BufferedReader getBufferedReader(String filePath,
>
> > > > > > String
>
> > > > > encoding)
>
> > > > > > throws UnsupportedEncodingException, FileNotFoundException {
>
> > > > > >
>
> > > > > >                 return new BufferedReader(new
>
> > > > > > InputStreamReader(new FileInputStream(filePath),
>
> > > Optional.ofNullable(encoding).
>
> > > > > > orElse("UTF-8")));
>
> > > > > >
>
> > > > > > }
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > As per the official documentation of
>
> > > > > > Callback<https://kafka.apache
>
> > .
>
> > > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm
>
> > > > > > l>, TimeoutException is a retriable exception. As I have kept
>
> > > > > > retries 10, producer will try to resend the message if
>
> > > > > > delivering some message fails with TimeoutException. I am
>
> > > > > > looking for some reliable to way to detect
>
> > > > > when
>
> > > > > > delivery of a message is failed permanently after all retries.
>
> > > > > >
>
> > > > > >
>
> > > > > >
>
> > > > > > Regards,
>
> > > > > >
>
> > > > > > Vatsal
>
> > > > > >
>
> > > > >
>
> > > >
>
> > >
>
> >
>
>
>
>
>
>
>
> --
>
> Regards,
>
>
>
> Rajini
>



-- 
Regards,

Rajini

Reply via email to