HI,
I need help with handling errors with the elasticsearch sink as below
2019-11-19 08:09:09,043 ERROR
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase -
Failed Elasticsearch item request:
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[75:108]: version conflict,
document already exists (current version [1])]]
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]]
ElasticsearchException[Elasticsearch exception
[type=version_conflict_engine_exception, reason=[75:108]: version conflict,
document already exists (current version [1])]]
at
org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at
org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at
org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at
org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at
org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at
org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at
org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at
org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
The error is expected since I am creating documents with duplicate ids, so I
can only load new data from a previous batch that was only partially loaded or
due to a timeout I’ve uploaded a document twice to ensure the document is
definitely loaded and not lost in the timeout.
The document is created as
val json = new util.HashMap[String, Any]
json.put("arrayinstance", esid)
json.put("bearing", element._1)
json.put("sampleindex", element._2)
json.put("sample", element._3)
json.put("hashstring", element._4)
json.put("priorrepeats", element._5)
return Requests.indexRequest()
.index("flink-index-deduplicated")
.`type`("_doc")
.id(element._1+":"+element._2)
.create(true)
.source(json)
}
My problem is how can I catch the failure, recover and carryon? I have set a
failure handler as below which will need extending to handle the failure above
esSinkBuilder.setFailureHandler(
new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
@Override
override def onFailure(action: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer) {
if (ExceptionUtils.findThrowable(failure,
classOf[EsRejectedExecutionException]).isPresent) {
Job.LOG.info("ElasticSearch full queue; re-added document for
indexing")
indexer.add(action)
} else if (ExceptionUtils.findThrowable(failure,
classOf[ElasticsearchParseException]).isPresent) {
LOG.info("Malformed ElasticSearch document. Document dropped")
} else if (ExceptionUtils.findThrowable(failure,
classOf[java.net.SocketTimeoutException]).isPresent) {
LOG.info("ElasticSearch document timeout; re-added document for
indexing")
indexer.add(action)
}/* else if (ExceptionUtils.findThrowable(failure,
classOf[]).isPresent) {
LOG.info("ElasticSearch document duplicate; ignored document")
} */else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose
to throw custom exceptions
Job.LOG.info(failure.getMessage)
throw failure
}
}
}
)
I have tried just ignoring the failure by removing the "throw failure” but to
no avail