Hi, I’m trying to implement a failure handler for ElasticSearch from the example in the Flink documentation
DataStream<String> input = ...; input.addSink(new ElasticsearchSink<>( config, transportAddresses, new ElasticsearchSinkFunction<String>() {...}, new ActionRequestFailureHandler() { @Override void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throw Throwable { if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { // full queue; re-add document for indexing indexer.add(action); } else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) { // malformed document; simply drop request without failing sink } else { // for all other failures, fail the sink // here the failure is simply rethrown, but users can also choose to throw custom exceptions throw failure; } } })); However, I can only find ExceptionUtils.containsThrowable in Flink 1.3. It is not present in 1.8. Am I mistaken, or if I’m not how I can I implement it using findThrowable ? TIA Nick