There's a critical bug in that section that has only been fixed in 0.9.0.2
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote:

> The callback should give you what you are asking for. Has it not worked as
> you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <mev...@sky.optymyze.com>
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> >                 try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> >                                 KafkaProducer<Object, String> producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> >                                 String line;
> >
> >                                 while ((line = bf.readLine()) != null) {
> >
> >                                                 producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> >                                                                 if (e !=
> > null) {
> >
> >
> >       e.printStackTrace();
> >
> >                                                                 }
> >
> >                                                 });
> >
> >                                 }
> >
> >                                 producer.flush();
> >
> >                 } catch (IOException e) {
> >
> >                                 Throwables.propagate(e);
> >
> >                 }
> >
> > }
> >
> >
> >
> > private static KafkaProducer<Object, String> initKafkaProducer(String
> > bootstrapServer) {
> >
> >                 Properties properties = new Properties();
> >
> >                 properties.put("bootstrap.servers", bootstrapServer);
> >
> >                 properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> >                 properties.put("acks", "-1");
> >
> >                 properties.put("retries", 10);
> >
> >                 return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> >                 return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback<https://kafka.apache.
> > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 10,
> > producer will try to resend the message if delivering some message fails
> > with TimeoutException. I am looking for some reliable to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>

Reply via email to