The KafkaProducer.send returns a Future<RecordMetadata>. What happens
when you add a future.get() on the returned Future, in that while loop,
for each sent record?
-Jaikiran
On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
Hello,
We have the following piece of code where we read lines from a file and push
them to a Kafka topic :
Properties properties = new Properties();
properties.put("bootstrap.servers", <bootstrapServers>);
properties.put("key.serializer",
StringSerializer.class.getCanonicalName());
properties.put("value.serializer",
StringSerializer.class.getCanonicalName());
properties.put("retries",100);
properties.put("acks", "all");
KafkaProducer<Object, String> producer = new
KafkaProducer<>(properties);
try (BufferedReader bf = new BufferedReader(new InputStreamReader(new
FileInputStream(filePath), "UTF-8"))) {
String line;
int count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line));
}
Logger.log("Done producing data messages. Total no of records
produced:" + count);
} catch (InterruptedException | ExecutionException | IOException e) {
Throwables.propagate(e);
} finally {
producer.close();
}
When we try this with a large file with a million records, only half of them
around 500,000 get written to the topic. In the above example, I verified this
by running the GetOffset tool after fair amount of time (to ensure all records
had finished processing) as follows:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list>
--time -1 --topic <topic_name>
The output of this was :
topic_name:1:292954
topic_name:0:296787
What could be causing this dropping of records?
Thanks,
Varun