[ 
https://issues.apache.org/jira/browse/FLINK-5353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15848248#comment-15848248
 ] 

ASF GitHub Bot commented on FLINK-5353:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3246

    [FLINK-5353] [elasticsearch] User-provided failure handler for 
ElasticsearchSink

    Only the last commit is relevant. This PR is based on #3112 so that the 
functionality is added for all Elasticsearch versions.
    
    It is also based on the work of @static-max in #2861, but with improvements 
for a more general approach to solve both 
[FLINK-5353](https://issues.apache.org/jira/browse/FLINK-5353) and 
[FLINK-5122](https://issues.apache.org/jira/browse/FLINK-5122). The PR is more 
of a preview of the functionality for our Elasticsearch users, as proper 
testing for the expected behaviours is still pending / Javadoc updates.
    
    With this PR, users can now provide a `ActionRequestFailureHandler` that 
controls how to deal with a failed Elasticsearch request.
    
    Example:
    
    ```
    private static class ExampleActionRequestFailureHandler implements 
ActionRequestFailureHandler {
        @Override
        boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
            if (failure instanceOf EsRejectedExecutionException) {
                 indexer.add(action);
                 return false;
            } else if (failure instanceOf ElasticsearchParseException) {
                // simply drop request without failing sink
                return false;
            } else {
                // for all other failures, fail the sink
                return true;
            }
        }
    }
    ```
    
    The above example will let the sink re-add requests that failed due to 
queue capacity saturation and drop requests with malformed documents, without 
failing the sink. For all other failures, the sink will fail. The handler is 
provided to the constructor of `ElasticsearchSink`.
    
    Note that the `onFailure` method is called only after the internal 
`BulkProcessor` finishes all backoff retry attempts for temporary 
`EsRejectedExecutionException`s (saturated ES node queue capacity).
    
    ### Alternatives:
    
    1. Currently, all failures reported in the `afterBulk` callback will be 
used to invoke `onFailure` of the handler. We can perhaps just pass some 
specific exceptions for the user to decide on how to handle them.
    
    2. The original `ElasticsearchSinkFunction` and new 
`ActionRequestFailureHandler` interface could perhaps be integrated into one.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5353

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3246
    
----
commit bf84c0aa91924aca779189b628a656d9b54e36db
Author: Mike Dias <mike.rodrigues.d...@gmail.com>
Date:   2016-11-07T20:09:48Z

    [FLINK-4988] Elasticsearch 5.x support

commit 4efb2d497759b3688fe80261df19bb1e1c3f1c21
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-01-12T13:21:56Z

    [FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors

commit be35862383b69c0d65fefd2c48c772a81fceb8d5
Author: Max Kuklinski <max.kuklin...@live.de>
Date:   2016-11-23T16:54:11Z

    [FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.
    
    Covered exceptions are: Timeouts, No Master, UnavailableShardsException, 
bulk queue on node full

commit fa67e8be5ca8e90d47ad12e947eac7b695e8fcca
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-01-30T05:55:26Z

    [FLINK-5353] [elasticsearch] User-provided failure handler for 
ElasticsearchSink
    
    This commit fixes both FLINK-5353 and FLINK-5122. It allows users to 
implement a
    failure handler to control how failed action requests are dealt with.
    
    The commit also includes general improvements to FLINK-5122:
    1. Use the built-in backoff functionality in the Elasticsearch 
BulkProcessor (not
    available for Elasticsearch 1.x)
    2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure 
handler

----


> Elasticsearch Sink loses well-formed documents when there are malformed 
> documents
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5353
>                 URL: https://issues.apache.org/jira/browse/FLINK-5353
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.1.3
>            Reporter: Flavio Pompermaier
>            Assignee: Tzu-Li (Gordon) Tai
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to