[ 
https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282743#comment-17282743
 ] 

Andrew Roberts edited comment on FLINK-14034 at 2/10/21, 10:16 PM:
-------------------------------------------------------------------

Just to add another use case here: It is possible for the kafka cluster at the 
end of this sink to throw an exception, and this exception can be deterministic 
(one example: message too large). In this deterministic case, the entire job 
will enter a restart loop (if checkpointed) that can't be recovered - the only 
solution we've found is to cancel the job, and restart it without savepoints or 
checkpoints, which results in a data loss window.

In the past, by simply extending FlinkKafkaProducer08 to capture certain 
errors, we were able to let our jobs survive these sorts of errors. This 
strategy doesn't seem possible with the current version of Flink / 
flink-kafka-connector. [~sewen] do you have any suggestions for strategies of 
surviving these sorts of error cases, if we can't interact with `invoke`?

Edit: It looks like `logFailuresOnly` is something of a way forward, but it 
doesn't allow for per-exception control the way overriding does (FLINK-14943)


was (Author: arobe...@fuze.com):
Just to add another use case here: It is possible for the kafka cluster at the 
end of this sink to throw an exception, and this exception can be deterministic 
(one example: message too large). In this deterministic case, the entire job 
will enter a restart loop (if checkpointed) that can't be recovered - the only 
solution we've found is to cancel the job, and restart it without savepoints or 
checkpoints, which results in a data loss window.

In the past, by simply extending FlinkKafkaProducer08 to capture certain 
errors, we were able to let our jobs survive these sorts of errors. This 
strategy doesn't seem possible with the current version of Flink / 
flink-kafka-connector. [~sewen] do you have any suggestions for strategies of 
surviving these sorts of error cases, if we can't interact with `invoke`?

> In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke 
> should be made final
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14034
>                 URL: https://issues.apache.org/jira/browse/FLINK-14034
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Niels van Kaam
>            Priority: Trivial
>
> It is not possible to override the invoke method of the FlinkKafkaProducer, 
> because the first parameter, KafkaTransactionState, is a private inner class. 
>  It is not possible to override the original invoke of SinkFunction, because 
> TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does 
> override the original invoke method with final.
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]
> If there is a particular reason for this, it would be better to make the 
> invoke method in FlinkKafkaProducer final as well, and document the reason 
> such that it is clear this is by design (I don't see any overrides in the 
> same package).
> Otherwise, I would make the KafkaTransactionState publicly visible. I would 
> like to override the Invoke method to create a custom KafkaProducer which 
> performs some additional generic validations and transformations. (which can 
> also be done in a process-function, but a custom sink would simplify the code 
> of jobs)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to