Hello,

I am trying to implement error handling in ElasticSearch sink (following the
seem-outdated Flink document [1])

<code>
        override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
                if (ExceptionUtils.findThrowable(failure,
classOf[org.elasticsearch.index.engine.VersionConflictEngineException]) !=
Optional.empty()) {
                        LOG.warn("Failed inserting record to ElasticSearch: 
statusCode {}
message: {} record: {} stacktrace {}.\nRetrying", restStatusCode.toString,
failure.getMessage, actionRequest.toString, failure.getStackTrace)
                        // Do something here
                }
                else {
                        LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode 
$restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
                }
        }
</code>

I tried to have different handling for the case of
VersionConflictEngineException, but failed. It always came to the "else"
branch, thus my log message is:
/ELASTICSEARCH FAILED:
    statusCode 409
    message: Elasticsearch exception
[type=version_conflict_engine_exception, reason=[_doc][...]: version
conflict, document already exists (current version [1])]
/
Thanks and best regards,
Averell

[1]  handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
  




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to