Another option which is probably easier is to pass a callback to `send` and log errors.
Ismael On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote: > You can collect the Futures and call `get` in batches. That would give you > access to the errors without blocking on each request. > > Ismael > > > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <phad...@sky.optymyze.com> > wrote: > >> Hello, >> >> We had tried that... If future.get() is added in the while loop, it takes >> too long for the loop to execute. >> >> Last time we tried it, it was running for that file for over 2 hours and >> still not finished. >> >> Regards, >> Varun >> >> -----Original Message----- >> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] >> Sent: 22 November 2016 02:20 >> To: users@kafka.apache.org >> Subject: Re: Kafka producer dropping records >> >> The KafkaProducer.send returns a Future<RecordMetadata>. What happens >> when you add a future.get() on the returned Future, in that while loop, for >> each sent record? >> >> -Jaikiran >> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote: >> > Hello, >> > >> > We have the following piece of code where we read lines from a file and >> push them to a Kafka topic : >> > >> > Properties properties = new Properties(); >> > properties.put("bootstrap.servers", <bootstrapServers>); >> > properties.put("key.serializer", >> StringSerializer.class.getCanonicalName()); >> > properties.put("value.serializer", >> StringSerializer.class.getCanonicalName()); >> > properties.put("retries",100); >> > properties.put("acks", "all"); >> > >> > KafkaProducer<Object, String> producer = new >> > KafkaProducer<>(properties); >> > >> > try (BufferedReader bf = new BufferedReader(new >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { >> > String line; >> > int count = 0; >> > while ((line = bf.readLine()) != null) { >> > count++; >> > producer.send(new ProducerRecord<>(topicName, line)); >> > } >> > Logger.log("Done producing data messages. Total no of >> records produced:" + count); >> > } catch (InterruptedException | ExecutionException | >> IOException e) { >> > Throwables.propagate(e); >> > } finally { >> > producer.close(); >> > } >> > >> > When we try this with a large file with a million records, only half of >> them around 500,000 get written to the topic. In the above example, I >> verified this by running the GetOffset tool after fair amount of time (to >> ensure all records had finished processing) as follows: >> > >> > >> > ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list >> > <broker_list> --time -1 --topic <topic_name> >> > >> > >> > >> > >> > The output of this was : >> > >> > >> > topic_name:1:292954 >> > >> > topic_name:0:296787 >> > >> > >> > What could be causing this dropping of records? >> > >> > Thanks, >> > Varun >> > >> >> >