[ 
https://issues.apache.org/jira/browse/FLINK-30998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686802#comment-17686802
 ] 

Leonid Ilyevsky commented on FLINK-30998:
-----------------------------------------

I actually looked at the code related to ActionRequestFailureHandler in 
org.apache.flink.streaming.connectors.opensearch package.

It is quite complicated, processing every single element there. Maybe this is 
why it was not yet ported to the new package.

I think it does not have to be this complicated. Maybe a simple interface like 
this will do:



public interface FailureHandler extends Serializable {

    void onFailure(Throwable failure);
}

 

I even tried to implement it myself, it looks fine; only that in my environment 
I have issues cleanly build the whole project (failing tests, some quarantined 
dependencies, etc.). I used the 
[dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.5.0|https://github.com/apache/flink-connector-opensearch/tree/dependabot/maven/flink-connector-opensearch/org.opensearch-opensearch-2.5.0]
 branch because I need support for Opensearch 2.
So maybe somebody can technically do these enhancements, I would really 
appreciate that. We can work together, I can share the code fragments that I 
changed. 

> Add optional exception handler to flink-connector-opensearch
> ------------------------------------------------------------
>
>                 Key: FLINK-30998
>                 URL: https://issues.apache.org/jira/browse/FLINK-30998
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Opensearch
>    Affects Versions: 1.16.1
>            Reporter: Leonid Ilyevsky
>            Priority: Major
>
> Currently, when there is a failure coming from Opensearch, the 
> FlinkRuntimeException is thrown from OpensearchWriter.java code (line 346). 
> This makes the Flink pipeline fail. There is no way to handle the exception 
> in the client code.
> I suggest to add an option to set a failure handler, similar to the way it is 
> done in elasticsearch connector. This way the client code has a chance to 
> examine the failure and handle it.
> Here is the use case example when it will be very useful. We are using 
> streams on Opensearch side, and we are setting our own document IDs. 
> Sometimes these IDs are duplicated; we need to ignore this situation and 
> continue (this way it works for us with Elastisearch).
> However, with opensearch connector, the error comes back, saying that the 
> batch failed (even though most of the documents were indexed, only the ones 
> with duplicated IDs were rejected), and the whole flink job fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to