Thank you Ahmed for the explanation.

The current Elasticsearch 8 connector already uses the
FatalExeptionClassifier for fatal / non-retriable requests [1]. It's very
similar to what you linked in the AWS connectors. Currently this is only
used for fully failed requests. The main problem I was concerned about is
the partial failures, when the Future of the client bulk request was not
completed exceptionally, but instead some items failed according to the
response. For those failed entries in the partially failed request, we
retry infinitely though retrying will not always help.

To avoid problems of "too many failed but non-retryable request entries in
the buffer", I was thinking we can fail fast instead of infinitely
retrying. Alternatively, we can limit the maximum number of retrying per
rerecord. Like FLINK-35541 you shared for AWS connectors, I think a similar
approach in Elasticsearch 8 connector would be useful. Given a
non-retriable request entry, it will retry the request entries anyway but
will eventually fail after exhausting the retries. Having both sound like a
more comprehensive solution, as following sample:

    void handlePartiallyUnprocessedRequest(
            Response response, Consumer requestResult) {
        List<Request> requestsToRetry = new ArrayList<>();

        for (Request r : response.failedItems()) {
            if (!isRetryable(r.errorCode())        // we don't have this
check for ES 8, which could be 400 / 404
                    || r.retryCount++ > maxRetry) {   // FLINK-35541 could
help limit retries for all failed requests
                throw new FlinkRuntimeException();
            }
            requestsToRetry.add(r);
        }

        requestResult.accept(requestsToRetry);
    }

[1]
https://github.com/apache/flink-connector-elasticsearch/blob/da2ef1fa6d5edd3cf1328b11632929fd2c99f567/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L73-L82


On Fri, Jun 7, 2024 at 3:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Mingliang,
>
> We already have a mechanism for detecting and propagating
> Fatal/Non-retryable exceptions[1]. We can use that in ElasticSearch similar
> to what we do for AWS connectors[2]. Also, you can check AWS connectors for
> how to add a fail-fast mechanism to disable retrying all along.
>
> > FLIP-451 proposes timeout for retrying which helps with un-acknowledged
> > requests, but not addressing the case when request gets processed and
> > failed items keep failing no matter how many times we retry. Correct me
> if
> > I'm wrong
> >
> yes you are correct, this is mainly to mitigate the issues arising from
> incorrect handling of requests in sink implementers.
> The Failure handling itself has always been assumed to be the Sink
> implementation responsibility, this is done in 3 levels
> - Classifying Fatal exceptions as mentioned above
> - Adding configuration to disable retries as mentioned above as well.
> - Adding mechanism to limit retries as in the proposed ticket for AWS
> connectors[3]
>
> In my opinion at least 1 and 3 are useful in this case for Elasticsearch,
> Adding classifiers and retry mechanisms for elasticsearch.
>
> Or we can allow users to configure
> > "drop/fail" behavior for non-retriable errors
> >
>
> I am not sure I follow this proposal, but in general while "Dropping"
> records seems to boost reliability, it breaks the at-least-once semantics
> and if you don't have proper tracing and debugging mechanisms we will be
> shooting ourselves in the foot.
>
>
> 1-
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java
> 2-
>
> https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L227
>
> 3-https://issues.apache.org/jira/browse/FLINK-35541
> Best Regards
> Ahmed Hamdy
>
>
> On Thu, 6 Jun 2024 at 06:53, Mingliang Liu <lium...@apache.org> wrote:
>
> > Hi all,
> >
> > Currently the Elasticsearch 8 connector retries all items if the request
> > fails as a whole, and retries failed items if the request has partial
> > failures [1]. I think this infinitely retries might be problematic in
> some
> > cases when retrying can never eventually succeed. For example, if the
> > request is 400 (bad request) or 404 (not found), retries do not help. If
> > there are too many failed items non-retriable, new requests will get
> > processed less effectively. In extreme cases, it may stall the pipeline
> if
> > in-flight requests are occupied by those failed items.
> >
> > FLIP-451 proposes timeout for retrying which helps with un-acknowledged
> > requests, but not addressing the case when request gets processed and
> > failed items keep failing no matter how many times we retry. Correct me
> if
> > I'm wrong.
> >
> > One opinionated option is to fail fast for non-retriable errors like 400
> /
> > 404 and to drop items for 409. Or we can allow users to configure
> > "drop/fail" behavior for non-retriable errors. I prefer the latter. I
> > checked how LogStash ingests data to Elasticsearch and it takes a similar
> > approach for non-retriable errors [2]. In my day job, we have a
> > dead-letter-queue in AsynSinkWriter for failed entries that exhaust
> > retries. I guess that is too specific to our setup and seems an overkill
> > here for Elasticsearch connector.
> >
> > Any thoughts on this?
> >
> > [1]
> >
> >
> https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170
> > [2]
> >
> >
> https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304
> >
>

Reply via email to