Hi Mingliang, We already have a mechanism for detecting and propagating Fatal/Non-retryable exceptions[1]. We can use that in ElasticSearch similar to what we do for AWS connectors[2]. Also, you can check AWS connectors for how to add a fail-fast mechanism to disable retrying all along.
> FLIP-451 proposes timeout for retrying which helps with un-acknowledged > requests, but not addressing the case when request gets processed and > failed items keep failing no matter how many times we retry. Correct me if > I'm wrong > yes you are correct, this is mainly to mitigate the issues arising from incorrect handling of requests in sink implementers. The Failure handling itself has always been assumed to be the Sink implementation responsibility, this is done in 3 levels - Classifying Fatal exceptions as mentioned above - Adding configuration to disable retries as mentioned above as well. - Adding mechanism to limit retries as in the proposed ticket for AWS connectors[3] In my opinion at least 1 and 3 are useful in this case for Elasticsearch, Adding classifiers and retry mechanisms for elasticsearch. Or we can allow users to configure > "drop/fail" behavior for non-retriable errors > I am not sure I follow this proposal, but in general while "Dropping" records seems to boost reliability, it breaks the at-least-once semantics and if you don't have proper tracing and debugging mechanisms we will be shooting ourselves in the foot. 1- https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java 2- https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L227 3-https://issues.apache.org/jira/browse/FLINK-35541 Best Regards Ahmed Hamdy On Thu, 6 Jun 2024 at 06:53, Mingliang Liu <lium...@apache.org> wrote: > Hi all, > > Currently the Elasticsearch 8 connector retries all items if the request > fails as a whole, and retries failed items if the request has partial > failures [1]. I think this infinitely retries might be problematic in some > cases when retrying can never eventually succeed. For example, if the > request is 400 (bad request) or 404 (not found), retries do not help. If > there are too many failed items non-retriable, new requests will get > processed less effectively. In extreme cases, it may stall the pipeline if > in-flight requests are occupied by those failed items. > > FLIP-451 proposes timeout for retrying which helps with un-acknowledged > requests, but not addressing the case when request gets processed and > failed items keep failing no matter how many times we retry. Correct me if > I'm wrong. > > One opinionated option is to fail fast for non-retriable errors like 400 / > 404 and to drop items for 409. Or we can allow users to configure > "drop/fail" behavior for non-retriable errors. I prefer the latter. I > checked how LogStash ingests data to Elasticsearch and it takes a similar > approach for non-retriable errors [2]. In my day job, we have a > dead-letter-queue in AsynSinkWriter for failed entries that exhaust > retries. I guess that is too specific to our setup and seems an overkill > here for Elasticsearch connector. > > Any thoughts on this? > > [1] > > https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170 > [2] > > https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304 >