Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might
go under certain circumstances unnoticed. So for example you have a write
operation which fails this will set the asyncException field which is not
checked before the next invoke call happens. If now a checkpoint operation
happens, it will pass and mark all messages up to this point as being
successfully processed. Only after the checkpoint, the producer will fail.
And this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <nni...@gmail.com> wrote:

> Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
> 'retry' mechanism doesn't kick in until a message is added to it's internal
> buffer.
>
> If there's an exception before that, KafkaProducer will throw that
> exception, and seems like Flink isn't handling that. In this case there
> will
> be a data loss.
>
> Related Flink code (FlinkKafkaProducerBase):
>
> if (logFailuresOnly) {
>             callback = new Callback() {
>                 @Override
>                 public void onCompletion(RecordMetadata metadata, Exception
> e) {
>                     if (e != null) {
>                         LOG.error("Error while sending record to Kafka: " +
> e.getMessage(), e);
>                     }
>                     acknowledgeMessage();
>                 }
>             };
>         }
>         else {
>             callback = new Callback() {
>                 @Override
>                 public void onCompletion(RecordMetadata metadata, Exception
> exception) {
>                     if (exception != null && asyncException == null) {
>                         asyncException = exception;
>                     }
>                     acknowledgeMessage();
>                 }
>             };
>         }
>
> Here are the scenario's we've identified that will cause data loss:
>
> All kafka brokers are down.
>
> In this case, before appending a message to it's buffer, KafkaProducer
> tries
> to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
> configured timeout, it throws an exception.
> -Memory records not writable (Existing bug in kafka 0.9.0.1 library)
> https://issues.apache.org/jira/browse/KAFKA-3594
>
> In both the above cases, KafkaProducer won't retry, and Flink will ignore
> the messages. the messages aren't even logged. The exception is, but not
> the
> messages which failed.
>
> Possible workarounds (Kafka settings):
>
> A very high value for metadata timeout (metadata.fetch.timeout.ms)
> A very high value for buffer expiry (request.timeout.ms)
> We're still investigating the possible side effects of changing the above
> kafka settings.
>
> So, is our understanding correct? Or is there a way we can avoid this data
> loss by modifying some Flink settings?
>
> Thanks.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-
> tp11413.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to