[ https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283750#comment-17283750 ]
Piotr Nowojski edited comment on FLINK-14034 at 2/12/21, 3:07 PM: ------------------------------------------------------------------ But what's preventing you from trying something like that: {code:java} public class ExceptionIgnoringSinkWrapper<IN> implements RichFunction<IN>, CheckpointedFunction, CheckpointListener{ private final RichFunction<IN> wrapped; public ExceptionIgnoringSinkWrapper(RichFunction<IN> wrapped) { this.wrapped = wrapped; } ... @Override public final void invoke(IN value) throws Exception { try { wrapped.invoke(value); } catch (Exception ex) { } } @Override public final void invoke(IN value, Context context) throws Exception { try { wrapped.invoke(value, context); } catch (Exception ex) { } } ... } dataStream.addSink(new ExceptionIgnoringSinkWrapper(new FlinkKafkaProducer(...)); {code} ? As long as you proxy/forward all of the public methods from the interfaces: RichSinkFunction<IN>, CheckpointedFunction, CheckpointListener it should work (% that underlying KafkaProducer or FlinkKafkaProducer might not be able to recover from exceptions. But even in that case, you could try to not only ignore the exceptions, but re-initialize the wrapped function. was (Author: pnowojski): But what's preventing you from trying something like that: {code:java} public class ExceptionIgnoringSinkWrapper<IN> implements RichFunction<IN>, CheckpointedFunction, CheckpointListener{ private final RichFunction<IN> wrapped; public ExceptionIgnoringSinkWrapper(RichFunction<IN> wrapped) { this.wrapped = wrapped; } ... @Override public final void invoke(IN value) throws Exception { try { wrapped.invoke(value); } catch (Exception ex) { } } @Override public final void invoke(IN value, Context context) throws Exception { try { wrapped.invoke(value, context); } catch (Exception ex) { } } } dataStream.addSink(new ExceptionIgnoringSinkWrapper(new FlinkKafkaProducer(...)); {code} ? As long as you proxy/forward all of the public methods from the interfaces: RichSinkFunction<IN>, CheckpointedFunction, CheckpointListener it should work (% that underlying KafkaProducer or FlinkKafkaProducer might not be able to recover from exceptions. But even in that case, you could try to not only ignore the exceptions, but re-initialize the wrapped function. > In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke > should be made final > ------------------------------------------------------------------------------------------------- > > Key: FLINK-14034 > URL: https://issues.apache.org/jira/browse/FLINK-14034 > Project: Flink > Issue Type: Wish > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Reporter: Niels van Kaam > Priority: Trivial > > It is not possible to override the invoke method of the FlinkKafkaProducer, > because the first parameter, KafkaTransactionState, is a private inner class. > It is not possible to override the original invoke of SinkFunction, because > TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does > override the original invoke method with final. > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java] > If there is a particular reason for this, it would be better to make the > invoke method in FlinkKafkaProducer final as well, and document the reason > such that it is clear this is by design (I don't see any overrides in the > same package). > Otherwise, I would make the KafkaTransactionState publicly visible. I would > like to override the Invoke method to create a custom KafkaProducer which > performs some additional generic validations and transformations. (which can > also be done in a process-function, but a custom sink would simplify the code > of jobs) > -- This message was sent by Atlassian Jira (v8.3.4#803005)