Hi Averell, This seems to be the bug that you encountered: https://issues.apache.org/jira/browse/FLINK-11046.
Cheers, Gordon On Sat, Feb 9, 2019 at 3:27 PM Averell <lvhu...@gmail.com> wrote: > Hello, > > I am trying to follow this Flink guide [1] to handle errors in > ElasticSearchSink by re-adding the failed messages to the queue. > The error scenarios that I got and going to retry are: (i) conflict in > UpdateRequest document version and (ii) lost connection to ElasticSearch. > These errors are expected to be non-persistent, would be solved by (i) > changing the version / (ii) gone after some seconds > What I expect is message got retried successfully. > What I actually got was: Flink seemed to get stuck on that (first) retry, > my > flow queued up (backpressure is 1 everywhere), all processing hung. > > Here is my error handling code: > > <code> > private object MyElasticSearchFailureHandler extends > ActionRequestFailureHandler { > override def onFailure(actionRequest: ActionRequest, > failure: Throwable, > restStatusCode: Int, indexer: RequestIndexer): Unit = { > if > (ExceptionUtils.findThrowableWithMessage(failure, > "version_conflict_engine_exception") != Optional.empty()) { > actionRequest match { > case s: UpdateRequest => > LOG.warn(s"Failed > inserting record to ElasticSearch due to version > conflict (${s.version()}). Retrying") > > LOG.warn(actionRequest.toString) > > indexer.add(s.version(s.version() + 1)) > case _ => > LOG.error("Failed > inserting record to ElasticSearch due to version > conflict. However, this is not an Update-Request. Don't know why.") > > LOG.error(actionRequest.toString) > throw failure > } > } else if (restStatusCode == -1 && > failure.getMessage.contains("Connection closed")) { > LOG.warn(s"Retrying record: > ${actionRequest.toString}") > actionRequest match { > case s: UpdateRequest => > indexer.add(s) > case s: IndexRequest => > indexer.add(s) > } > } else { > LOG.error(s"ELASTICSEARCH FAILED:\n > statusCode $restStatusCode\n > message: ${failure.getMessage}\n${failure.getStackTrace}") > LOG.error(s" DATA:\n > ${actionRequest.toString}") > throw failure > } > } > } > </code> > > Here is the extract from my task-manager logs: > > /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR > o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase - Failed > Elasticsearch bulk request: Connection closed > 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN > c.n.c......sink.MyElasticSearchSink$ - Retrying record: update > {[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index > {*[null][null][null]*, source[{...}]}], scripted_upsert[false], > detect_noop[true]} > 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO > o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 > checkpointing for checkpoint with id=24 (max part counter=26)./ > > And job-manager logs: > /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in > 307078 ms). > 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. > 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 24 > of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. > 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. > 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 25 > of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. > 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./ > > Thanks and best regards, > Averell > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests > < > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests> > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >