Hi Varun,

You could increase `retries`, but seems like you already configured it to
be `100`. Another option is to increase `retry.backoff.ms` which will
increase the time between retries.

Ismael

On Fri, Nov 25, 2016 at 9:38 AM, Phadnis, Varun <phad...@sky.optymyze.com>
wrote:

> Hello,
>
> Sorry for the late response, we tried logging the errors received in the
> callback and the result is that we are facing TimeoutExceptions
>
>         org.apache.kafka.common.errors.TimeoutException: Batch containing
> 93 record(s) expired due to timeout while requesting metadata from brokers
> for mp_test2-1
>
> Increasing the request.timeout.ms=100000 (from default of 30000) fixed
> the messages from being dropped. However that seems like solution which
> would not scale if there was a unpredictable "burst" of slowness in network
> causing longer delay.
>
> Is there a better way to handle this? Is there any other producer/broker
> configuration I could tweak to increase the reliability of the producer?
>
> Thanks,
> Varun
>
> -----Original Message-----
> From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> Juma
> Sent: 22 November 2016 08:31
> To: Kafka Users <users@kafka.apache.org>
> Subject: Re: Kafka producer dropping records
>
> Another option which is probably easier is to pass a callback to `send`
> and log errors.
>
> Ismael
>
> On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > You can collect the Futures and call `get` in batches. That would give
> > you access to the errors without blocking on each request.
> >
> > Ismael
> >
> >
> > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun
> > <phad...@sky.optymyze.com>
> > wrote:
> >
> >> Hello,
> >>
> >> We had tried that... If future.get() is added in the while loop, it
> >> takes too long for the loop to execute.
> >>
> >> Last time we tried it, it was running for that file for over 2 hours
> >> and still not finished.
> >>
> >> Regards,
> >> Varun
> >>
> >> -----Original Message-----
> >> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
> >> Sent: 22 November 2016 02:20
> >> To: users@kafka.apache.org
> >> Subject: Re: Kafka producer dropping records
> >>
> >> The KafkaProducer.send returns a Future<RecordMetadata>. What happens
> >> when you add a future.get() on the returned Future, in that while
> >> loop, for each sent record?
> >>
> >> -Jaikiran
> >>
> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> >> > Hello,
> >> >
> >> > We have the following piece of code where we read lines from a file
> >> > and
> >> push them to a Kafka topic :
> >> >
> >> >          Properties properties = new Properties();
> >> >          properties.put("bootstrap.servers", <bootstrapServers>);
> >> >          properties.put("key.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >          properties.put("value.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >          properties.put("retries",100);
> >> >         properties.put("acks", "all");
> >> >
> >> >         KafkaProducer<Object, String> producer =  new
> >> > KafkaProducer<>(properties);
> >> >
> >> >          try (BufferedReader bf = new BufferedReader(new
> >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >> >              String line;
> >> >              int count = 0;
> >> >              while ((line = bf.readLine()) != null) {
> >> >                  count++;
> >> >                  producer.send(new ProducerRecord<>(topicName, line));
> >> >              }
> >> >              Logger.log("Done producing data messages. Total no of
> >> records produced:" + count);
> >> >          } catch (InterruptedException | ExecutionException |
> >> IOException e) {
> >> >              Throwables.propagate(e);
> >> >          } finally {
> >> >              producer.close();
> >> >          }
> >> >
> >> > When we try this with a large file with a million records, only
> >> > half of
> >> them around 500,000 get written to the topic. In the above example, I
> >> verified this by running the GetOffset tool after fair amount of time
> >> (to ensure all records had finished processing) as follows:
> >> >
> >> >
> >> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell
> >> > --broker-list <broker_list> --time -1 --topic <topic_name>
> >> >
> >> >
> >> >
> >> >
> >> > The output of this was :
> >> >
> >> >
> >> >          topic_name:1:292954
> >> >
> >> >          topic_name:0:296787
> >> >
> >> >
> >> > What could be causing this dropping of records?
> >> >
> >> > Thanks,
> >> > Varun
> >> >
> >>
> >>
> >
>

Reply via email to