The callback should give you what you are asking for. Has it not worked as you expect when you tried it?
Ismael On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <mev...@sky.optymyze.com> wrote: > Hi, > > > > I am reading a file and dumping each record on Kafka. Here is my producer > code: > > > > public void produce(String topicName, String filePath, String > bootstrapServers, String encoding) { > > try (BufferedReader bf = getBufferedReader(filePath, > encoding); > > KafkaProducer<Object, String> producer = > initKafkaProducer(bootstrapServers)) { > > String line; > > while ((line = bf.readLine()) != null) { > > producer.send(new > ProducerRecord<>(topicName, line), (metadata, e) -> { > > if (e != > null) { > > > e.printStackTrace(); > > } > > }); > > } > > producer.flush(); > > } catch (IOException e) { > > Throwables.propagate(e); > > } > > } > > > > private static KafkaProducer<Object, String> initKafkaProducer(String > bootstrapServer) { > > Properties properties = new Properties(); > > properties.put("bootstrap.servers", bootstrapServer); > > properties.put("key.serializer", StringSerializer.class. > getCanonicalName()); > > properties.put("value.serializer", StringSerializer.class. > getCanonicalName()); > > properties.put("acks", "-1"); > > properties.put("retries", 10); > > return new KafkaProducer<>(properties); > > } > > > > private BufferedReader getBufferedReader(String filePath, String encoding) > throws UnsupportedEncodingException, FileNotFoundException { > > return new BufferedReader(new InputStreamReader(new > FileInputStream(filePath), Optional.ofNullable(encoding). > orElse("UTF-8"))); > > } > > > > As per the official documentation of Callback<https://kafka.apache. > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, > TimeoutException is a retriable exception. As I have kept retries 10, > producer will try to resend the message if delivering some message fails > with TimeoutException. I am looking for some reliable to way to detect when > delivery of a message is failed permanently after all retries. > > > > Regards, > > Vatsal >