Hello, I am trying to implement error handling in ElasticSearch sink (following the seem-outdated Flink document [1])
<code> override def onFailure(actionRequest: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = { if (ExceptionUtils.findThrowable(failure, classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) != Optional.empty()) { LOG.warn("Failed inserting record to ElasticSearch: statusCode {} message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString, failure.getMessage, actionRequest.toString, failure.getStackTrace) // Do something here } else { LOG.error(s"ELASTICSEARCH FAILED:\n statusCode $restStatusCode\n message: ${failure.getMessage}\n${failure.getStackTrace}") } } </code> I tried to have different handling for the case of VersionConflictEngineException, but failed. It always came to the "else" branch, thus my log message is: /ELASTICSEARCH FAILED: statusCode 409 message: Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][...]: version conflict, document already exists (current version [1])] / Thanks and best regards, Averell [1] handling-failing-elasticsearch-requests <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/