[ 
https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283750#comment-17283750
 ] 

Piotr Nowojski edited comment on FLINK-14034 at 2/12/21, 3:07 PM:
------------------------------------------------------------------

But what's preventing you from trying something like that:

{code:java}
public class ExceptionIgnoringSinkWrapper<IN> implements RichFunction<IN>, 
CheckpointedFunction, CheckpointListener{
  private final RichFunction<IN> wrapped;

  public ExceptionIgnoringSinkWrapper(RichFunction<IN> wrapped) {
    this.wrapped = wrapped;
  }

  ...

  @Override
  public final void invoke(IN value) throws Exception {
    try {
      wrapped.invoke(value);
    } catch (Exception ex) {
    }
  }

  @Override
  public final void invoke(IN value, Context context) throws Exception {
    try {
      wrapped.invoke(value, context);
    } catch (Exception ex) {
    }
  }
  
  ...
}

dataStream.addSink(new ExceptionIgnoringSinkWrapper(new 
FlinkKafkaProducer(...));
{code}
?

As long as you proxy/forward all of the public methods from the interfaces: 
RichSinkFunction<IN>, CheckpointedFunction, CheckpointListener it should work 
(% that underlying KafkaProducer or FlinkKafkaProducer might not be able to 
recover from exceptions. But even in that case, you could try to not only 
ignore the exceptions, but re-initialize the wrapped function. 


was (Author: pnowojski):
But what's preventing you from trying something like that:

{code:java}
public class ExceptionIgnoringSinkWrapper<IN> implements RichFunction<IN>, 
CheckpointedFunction, CheckpointListener{
  private final RichFunction<IN> wrapped;

  public ExceptionIgnoringSinkWrapper(RichFunction<IN> wrapped) {
    this.wrapped = wrapped;
  }

  ...

  @Override
  public final void invoke(IN value) throws Exception {
    try {
      wrapped.invoke(value);
    } catch (Exception ex) {
    }
  }

  @Override
  public final void invoke(IN value, Context context) throws Exception {
    try {
      wrapped.invoke(value, context);
    } catch (Exception ex) {
    }
  }
}

dataStream.addSink(new ExceptionIgnoringSinkWrapper(new 
FlinkKafkaProducer(...));
{code}
?

As long as you proxy/forward all of the public methods from the interfaces: 
RichSinkFunction<IN>, CheckpointedFunction, CheckpointListener it should work 
(% that underlying KafkaProducer or FlinkKafkaProducer might not be able to 
recover from exceptions. But even in that case, you could try to not only 
ignore the exceptions, but re-initialize the wrapped function. 

> In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke 
> should be made final
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14034
>                 URL: https://issues.apache.org/jira/browse/FLINK-14034
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Niels van Kaam
>            Priority: Trivial
>
> It is not possible to override the invoke method of the FlinkKafkaProducer, 
> because the first parameter, KafkaTransactionState, is a private inner class. 
>  It is not possible to override the original invoke of SinkFunction, because 
> TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does 
> override the original invoke method with final.
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]
> If there is a particular reason for this, it would be better to make the 
> invoke method in FlinkKafkaProducer final as well, and document the reason 
> such that it is clear this is by design (I don't see any overrides in the 
> same package).
> Otherwise, I would make the KafkaTransactionState publicly visible. I would 
> like to override the Invoke method to create a custom KafkaProducer which 
> performs some additional generic validations and transformations. (which can 
> also be done in a process-function, but a custom sink would simplify the code 
> of jobs)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to