Hello, Sorry for the late response, we tried logging the errors received in the callback and the result is that we are facing TimeoutExceptions
org.apache.kafka.common.errors.TimeoutException: Batch containing 93 record(s) expired due to timeout while requesting metadata from brokers for mp_test2-1 Increasing the request.timeout.ms=100000 (from default of 30000) fixed the messages from being dropped. However that seems like solution which would not scale if there was a unpredictable "burst" of slowness in network causing longer delay. Is there a better way to handle this? Is there any other producer/broker configuration I could tweak to increase the reliability of the producer? Thanks, Varun -----Original Message----- From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma Sent: 22 November 2016 08:31 To: Kafka Users <users@kafka.apache.org> Subject: Re: Kafka producer dropping records Another option which is probably easier is to pass a callback to `send` and log errors. Ismael On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote: > You can collect the Futures and call `get` in batches. That would give > you access to the errors without blocking on each request. > > Ismael > > > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun > <phad...@sky.optymyze.com> > wrote: > >> Hello, >> >> We had tried that... If future.get() is added in the while loop, it >> takes too long for the loop to execute. >> >> Last time we tried it, it was running for that file for over 2 hours >> and still not finished. >> >> Regards, >> Varun >> >> -----Original Message----- >> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] >> Sent: 22 November 2016 02:20 >> To: users@kafka.apache.org >> Subject: Re: Kafka producer dropping records >> >> The KafkaProducer.send returns a Future<RecordMetadata>. What happens >> when you add a future.get() on the returned Future, in that while >> loop, for each sent record? >> >> -Jaikiran >> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote: >> > Hello, >> > >> > We have the following piece of code where we read lines from a file >> > and >> push them to a Kafka topic : >> > >> > Properties properties = new Properties(); >> > properties.put("bootstrap.servers", <bootstrapServers>); >> > properties.put("key.serializer", >> StringSerializer.class.getCanonicalName()); >> > properties.put("value.serializer", >> StringSerializer.class.getCanonicalName()); >> > properties.put("retries",100); >> > properties.put("acks", "all"); >> > >> > KafkaProducer<Object, String> producer = new >> > KafkaProducer<>(properties); >> > >> > try (BufferedReader bf = new BufferedReader(new >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { >> > String line; >> > int count = 0; >> > while ((line = bf.readLine()) != null) { >> > count++; >> > producer.send(new ProducerRecord<>(topicName, line)); >> > } >> > Logger.log("Done producing data messages. Total no of >> records produced:" + count); >> > } catch (InterruptedException | ExecutionException | >> IOException e) { >> > Throwables.propagate(e); >> > } finally { >> > producer.close(); >> > } >> > >> > When we try this with a large file with a million records, only >> > half of >> them around 500,000 get written to the topic. In the above example, I >> verified this by running the GetOffset tool after fair amount of time >> (to ensure all records had finished processing) as follows: >> > >> > >> > ./kafka-run-class.sh kafka.tools.GetOffsetShell >> > --broker-list <broker_list> --time -1 --topic <topic_name> >> > >> > >> > >> > >> > The output of this was : >> > >> > >> > topic_name:1:292954 >> > >> > topic_name:0:296787 >> > >> > >> > What could be causing this dropping of records? >> > >> > Thanks, >> > Varun >> > >> >> >