Hi,


I am reading a file and dumping each record on Kafka. Here is my producer code:



public void produce(String topicName, String filePath, String bootstrapServers, 
String encoding) {

                try (BufferedReader bf = getBufferedReader(filePath, encoding);

                                KafkaProducer<Object, String> producer = 
initKafkaProducer(bootstrapServers)) {

                                String line;

                                while ((line = bf.readLine()) != null) {

                                                producer.send(new 
ProducerRecord<>(topicName, line), (metadata, e) -> {

                                                                if (e != null) {

                                                                                
e.printStackTrace();

                                                                }

                                                });

                                }

                                producer.flush();

                } catch (IOException e) {

                                Throwables.propagate(e);

                }

}



private static KafkaProducer<Object, String> initKafkaProducer(String 
bootstrapServer) {

                Properties properties = new Properties();

                properties.put("bootstrap.servers", bootstrapServer);

                properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());

                properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());

                properties.put("acks", "-1");

                properties.put("retries", 10);

                return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) 
throws UnsupportedEncodingException, FileNotFoundException {

                return new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



As per the official documentation of 
Callback<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
 TimeoutException is a retriable exception. As I have kept retries 10, producer 
will try to resend the message if delivering some message fails with 
TimeoutException. I am looking for some reliable to way to detect when delivery 
of a message is failed permanently after all retries.



Regards,

Vatsal

Reply via email to