I have updated the PR for review and also added a new test.

https://github.com/apache/flink-connector-elasticsearch/pull/107

Could you take a look, Ahmed?

Thanks,

On Tue, Jun 11, 2024 at 5:23 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Mingliang,
> Yes sounds like a good solution, I am not very familiar with ElasticSearch
> internals and APIs but will try to assist with the PR when ready.
> Best Regards
> Ahmed Hamdy
>
>
> On Tue, 11 Jun 2024 at 07:07, Mingliang Liu <lium...@apache.org> wrote:
>
> > Thank you Ahmed for the explanation.
> >
> > The current Elasticsearch 8 connector already uses the
> > FatalExeptionClassifier for fatal / non-retriable requests [1]. It's very
> > similar to what you linked in the AWS connectors. Currently this is only
> > used for fully failed requests. The main problem I was concerned about is
> > the partial failures, when the Future of the client bulk request was not
> > completed exceptionally, but instead some items failed according to the
> > response. For those failed entries in the partially failed request, we
> > retry infinitely though retrying will not always help.
> >
> > To avoid problems of "too many failed but non-retryable request entries
> in
> > the buffer", I was thinking we can fail fast instead of infinitely
> > retrying. Alternatively, we can limit the maximum number of retrying per
> > rerecord. Like FLINK-35541 you shared for AWS connectors, I think a
> similar
> > approach in Elasticsearch 8 connector would be useful. Given a
> > non-retriable request entry, it will retry the request entries anyway but
> > will eventually fail after exhausting the retries. Having both sound
> like a
> > more comprehensive solution, as following sample:
> >
> >     void handlePartiallyUnprocessedRequest(
> >             Response response, Consumer requestResult) {
> >         List<Request> requestsToRetry = new ArrayList<>();
> >
> >         for (Request r : response.failedItems()) {
> >             if (!isRetryable(r.errorCode())        // we don't have this
> > check for ES 8, which could be 400 / 404
> >                     || r.retryCount++ > maxRetry) {   // FLINK-35541
> could
> > help limit retries for all failed requests
> >                 throw new FlinkRuntimeException();
> >             }
> >             requestsToRetry.add(r);
> >         }
> >
> >         requestResult.accept(requestsToRetry);
> >     }
> >
> > [1]
> >
> >
> https://github.com/apache/flink-connector-elasticsearch/blob/da2ef1fa6d5edd3cf1328b11632929fd2c99f567/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L73-L82
> >
> >
> > On Fri, Jun 7, 2024 at 3:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
> >
> > > Hi Mingliang,
> > >
> > > We already have a mechanism for detecting and propagating
> > > Fatal/Non-retryable exceptions[1]. We can use that in ElasticSearch
> > similar
> > > to what we do for AWS connectors[2]. Also, you can check AWS connectors
> > for
> > > how to add a fail-fast mechanism to disable retrying all along.
> > >
> > > > FLIP-451 proposes timeout for retrying which helps with
> un-acknowledged
> > > > requests, but not addressing the case when request gets processed and
> > > > failed items keep failing no matter how many times we retry. Correct
> me
> > > if
> > > > I'm wrong
> > > >
> > > yes you are correct, this is mainly to mitigate the issues arising from
> > > incorrect handling of requests in sink implementers.
> > > The Failure handling itself has always been assumed to be the Sink
> > > implementation responsibility, this is done in 3 levels
> > > - Classifying Fatal exceptions as mentioned above
> > > - Adding configuration to disable retries as mentioned above as well.
> > > - Adding mechanism to limit retries as in the proposed ticket for AWS
> > > connectors[3]
> > >
> > > In my opinion at least 1 and 3 are useful in this case for
> Elasticsearch,
> > > Adding classifiers and retry mechanisms for elasticsearch.
> > >
> > > Or we can allow users to configure
> > > > "drop/fail" behavior for non-retriable errors
> > > >
> > >
> > > I am not sure I follow this proposal, but in general while "Dropping"
> > > records seems to boost reliability, it breaks the at-least-once
> semantics
> > > and if you don't have proper tracing and debugging mechanisms we will
> be
> > > shooting ourselves in the foot.
> > >
> > >
> > > 1-
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java
> > > 2-
> > >
> > >
> >
> https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L227
> > >
> > > 3-https://issues.apache.org/jira/browse/FLINK-35541
> > > Best Regards
> > > Ahmed Hamdy
> > >
> > >
> > > On Thu, 6 Jun 2024 at 06:53, Mingliang Liu <lium...@apache.org> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Currently the Elasticsearch 8 connector retries all items if the
> > request
> > > > fails as a whole, and retries failed items if the request has partial
> > > > failures [1]. I think this infinitely retries might be problematic in
> > > some
> > > > cases when retrying can never eventually succeed. For example, if the
> > > > request is 400 (bad request) or 404 (not found), retries do not help.
> > If
> > > > there are too many failed items non-retriable, new requests will get
> > > > processed less effectively. In extreme cases, it may stall the
> pipeline
> > > if
> > > > in-flight requests are occupied by those failed items.
> > > >
> > > > FLIP-451 proposes timeout for retrying which helps with
> un-acknowledged
> > > > requests, but not addressing the case when request gets processed and
> > > > failed items keep failing no matter how many times we retry. Correct
> me
> > > if
> > > > I'm wrong.
> > > >
> > > > One opinionated option is to fail fast for non-retriable errors like
> > 400
> > > /
> > > > 404 and to drop items for 409. Or we can allow users to configure
> > > > "drop/fail" behavior for non-retriable errors. I prefer the latter. I
> > > > checked how LogStash ingests data to Elasticsearch and it takes a
> > similar
> > > > approach for non-retriable errors [2]. In my day job, we have a
> > > > dead-letter-queue in AsynSinkWriter for failed entries that exhaust
> > > > retries. I guess that is too specific to our setup and seems an
> > overkill
> > > > here for Elasticsearch connector.
> > > >
> > > > Any thoughts on this?
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304
> > > >
> > >
> >
>

Reply via email to