That tells you that the acknowledgements (which in your case, you have
set to receive ACKs from all brokers in the ISR) aren't happening and
that can essentially mean that the records aren't making it to the
topics. How many brokers do you have? What's the replication factor on
the topic and what are the ISR brokers for the topic? Ultimately, you
have to figure out why these ACKs aren't happening.
-Jaikiran
On Tuesday 22 November 2016 02:26 PM, Phadnis, Varun wrote:
Hello,
We had tried that... If future.get() is added in the while loop, it takes too
long for the loop to execute.
Last time we tried it, it was running for that file for over 2 hours and still
not finished.
Regards,
Varun
-----Original Message-----
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records
The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you
add a future.get() on the returned Future, in that while loop, for each sent record?
-Jaikiran
On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
Hello,
We have the following piece of code where we read lines from a file and push
them to a Kafka topic :
Properties properties = new Properties();
properties.put("bootstrap.servers", <bootstrapServers>);
properties.put("key.serializer",
StringSerializer.class.getCanonicalName());
properties.put("value.serializer",
StringSerializer.class.getCanonicalName());
properties.put("retries",100);
properties.put("acks", "all");
KafkaProducer<Object, String> producer = new
KafkaProducer<>(properties);
try (BufferedReader bf = new BufferedReader(new InputStreamReader(new
FileInputStream(filePath), "UTF-8"))) {
String line;
int count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line));
}
Logger.log("Done producing data messages. Total no of records
produced:" + count);
} catch (InterruptedException | ExecutionException | IOException e) {
Throwables.propagate(e);
} finally {
producer.close();
}
When we try this with a large file with a million records, only half of them
around 500,000 get written to the topic. In the above example, I verified this
by running the GetOffset tool after fair amount of time (to ensure all records
had finished processing) as follows:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
<broker_list> --time -1 --topic <topic_name>
The output of this was :
topic_name:1:292954
topic_name:0:296787
What could be causing this dropping of records?
Thanks,
Varun