The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you add a future.get() on the returned Future, in that while loop, for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

         Properties properties = new Properties();
         properties.put("bootstrap.servers", <bootstrapServers>);
         properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
         properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
         properties.put("retries",100);
        properties.put("acks", "all");

        KafkaProducer<Object, String> producer =  new 
KafkaProducer<>(properties);

         try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
             String line;
             int count = 0;
             while ((line = bf.readLine()) != null) {
                 count++;
                 producer.send(new ProducerRecord<>(topicName, line));
             }
             Logger.log("Done producing data messages. Total no of records 
produced:" + count);
         } catch (InterruptedException | ExecutionException | IOException e) {
             Throwables.propagate(e);
         } finally {
             producer.close();
         }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


         ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> 
--time -1 --topic <topic_name>




The output of this was :


         topic_name:1:292954

         topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun


Reply via email to