It is called at the top since exceptions can occur asynchronously when
invoke() already exited. In this case the only place you can fail is if
the next record is being processed.
On 12/04/2019 11:00, Kumar Bolar, Harshith wrote:
Hi all,
I had a requirement to handle Kafka producer exceptions so that they
don’t bring down the job. I extended FlinkKafkaProducer010 and handled
the exceptions there.
public void invoke(T value, Context context) throws Exception {
try {
this.checkErroneous();
...
this.producer.send(record, this.callback);
} catch (Exception exception) {
// Handle exception
}
}
The problem with this is, because checkErroneous() is at the beginning
of the invoke() method, the catch block is getting triggered for the
next message – not for the message that is causing the exception. So,
I moved checkErroneous() below producer.send() like so –
public void invoke(T value, Context context) throws Exception {
try {
...
this.producer.send(record, this.callback);
this.checkErroneous();
} catch (Exception exception) {
// Handle exception
}
}
This solved the issue, the exceptions are now being thrown for the
message that’s causing the error instead of the next message.
Is there a specific reason why checkErroneous() is on top? Or am I
doing something wrong?
Class:
https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
Regards,
Harshith