There's a critical bug in that section that has only been fixed in 0.9.0.2 which has not been release yet. Without the fix it doesn't really retry. I forked the kafka repo, applied the fix, built it and placed it in our own Nexus Maven repository until 0.9.0.2 will be released.
https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio Feel free to use it. On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote: > The callback should give you what you are asking for. Has it not worked as > you expect when you tried it? > > Ismael > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <mev...@sky.optymyze.com> > wrote: > > > Hi, > > > > > > > > I am reading a file and dumping each record on Kafka. Here is my producer > > code: > > > > > > > > public void produce(String topicName, String filePath, String > > bootstrapServers, String encoding) { > > > > try (BufferedReader bf = getBufferedReader(filePath, > > encoding); > > > > KafkaProducer<Object, String> producer = > > initKafkaProducer(bootstrapServers)) { > > > > String line; > > > > while ((line = bf.readLine()) != null) { > > > > producer.send(new > > ProducerRecord<>(topicName, line), (metadata, e) -> { > > > > if (e != > > null) { > > > > > > e.printStackTrace(); > > > > } > > > > }); > > > > } > > > > producer.flush(); > > > > } catch (IOException e) { > > > > Throwables.propagate(e); > > > > } > > > > } > > > > > > > > private static KafkaProducer<Object, String> initKafkaProducer(String > > bootstrapServer) { > > > > Properties properties = new Properties(); > > > > properties.put("bootstrap.servers", bootstrapServer); > > > > properties.put("key.serializer", StringSerializer.class. > > getCanonicalName()); > > > > properties.put("value.serializer", > StringSerializer.class. > > getCanonicalName()); > > > > properties.put("acks", "-1"); > > > > properties.put("retries", 10); > > > > return new KafkaProducer<>(properties); > > > > } > > > > > > > > private BufferedReader getBufferedReader(String filePath, String > encoding) > > throws UnsupportedEncodingException, FileNotFoundException { > > > > return new BufferedReader(new InputStreamReader(new > > FileInputStream(filePath), Optional.ofNullable(encoding). > > orElse("UTF-8"))); > > > > } > > > > > > > > As per the official documentation of Callback<https://kafka.apache. > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, > > TimeoutException is a retriable exception. As I have kept retries 10, > > producer will try to resend the message if delivering some message fails > > with TimeoutException. I am looking for some reliable to way to detect > when > > delivery of a message is failed permanently after all retries. > > > > > > > > Regards, > > > > Vatsal > > >