Hi,

I’m trying to implement a failure handler for ElasticSearch from the example in 
the Flink documentation

DataStream<String> input = ...;

input.addSink(new ElasticsearchSink<>(
    config, transportAddresses,
    new ElasticsearchSinkFunction<String>() {...},
    new ActionRequestFailureHandler() {
        @Override
        void onFailure(ActionRequest action,
                Throwable failure,
                int restStatusCode,
                RequestIndexer indexer) throw Throwable {

            if (ExceptionUtils.containsThrowable(failure, 
EsRejectedExecutionException.class)) {
                // full queue; re-add document for indexing
                indexer.add(action);
            } else if (ExceptionUtils.containsThrowable(failure, 
ElasticsearchParseException.class)) {
                // malformed document; simply drop request without failing sink
            } else {
                // for all other failures, fail the sink
                // here the failure is simply rethrown, but users can also 
choose to throw custom exceptions
                throw failure;
            }
        }
}));

However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is 
not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using 
findThrowable ?

TIA

Nick

Reply via email to