Hi all, I had a requirement to handle Kafka producer exceptions so that they don’t bring down the job. I extended FlinkKafkaProducer010 and handled the exceptions there.
public void invoke(T value, Context context) throws Exception { try { this.checkErroneous(); ... this.producer.send(record, this.callback); } catch (Exception exception) { // Handle exception } } The problem with this is, because checkErroneous() is at the beginning of the invoke() method, the catch block is getting triggered for the next message – not for the message that is causing the exception. So, I moved checkErroneous() below producer.send() like so – public void invoke(T value, Context context) throws Exception { try { ... this.producer.send(record, this.callback); this.checkErroneous(); } catch (Exception exception) { // Handle exception } } This solved the issue, the exceptions are now being thrown for the message that’s causing the error instead of the next message. Is there a specific reason why checkErroneous() is on top? Or am I doing something wrong? Class: https://github.com/apache/flink/blob/19d20e5cf8d44d726b4a44575e6c8db677e4c3c8/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java Regards, Harshith