The callback should give you what you are asking for. Has it not worked as
you expect when you tried it?

Ismael

On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal <mev...@sky.optymyze.com>
wrote:

> Hi,
>
>
>
> I am reading a file and dumping each record on Kafka. Here is my producer
> code:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
>                 try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
>                                 KafkaProducer<Object, String> producer =
> initKafkaProducer(bootstrapServers)) {
>
>                                 String line;
>
>                                 while ((line = bf.readLine()) != null) {
>
>                                                 producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
>                                                                 if (e !=
> null) {
>
>
>       e.printStackTrace();
>
>                                                                 }
>
>                                                 });
>
>                                 }
>
>                                 producer.flush();
>
>                 } catch (IOException e) {
>
>                                 Throwables.propagate(e);
>
>                 }
>
> }
>
>
>
> private static KafkaProducer<Object, String> initKafkaProducer(String
> bootstrapServer) {
>
>                 Properties properties = new Properties();
>
>                 properties.put("bootstrap.servers", bootstrapServer);
>
>                 properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("value.serializer", StringSerializer.class.
> getCanonicalName());
>
>                 properties.put("acks", "-1");
>
>                 properties.put("retries", 10);
>
>                 return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
>                 return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> As per the official documentation of Callback<https://kafka.apache.
> org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> TimeoutException is a retriable exception. As I have kept retries 10,
> producer will try to resend the message if delivering some message fails
> with TimeoutException. I am looking for some reliable to way to detect when
> delivery of a message is failed permanently after all retries.
>
>
>
> Regards,
>
> Vatsal
>

Reply via email to