[ 
https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14938:
-----------------------------------
    Labels: pull-request-available  (was: )

> Flink elasticsearch failure handler re-add indexrequest causes 
> ConcurrentModificationException
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14938
>                 URL: https://issues.apache.org/jira/browse/FLINK-14938
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / ElasticSearch
>    Affects Versions: 1.8.1
>            Reporter: Shengnan YU
>            Assignee: Shengnan YU
>            Priority: Major
>              Labels: pull-request-available
>
>  
> When use Elasticsearch connector failure handler (from official example) to 
> re-add documents, Flink encountered ConcurrentModificationException.
> {code:java}
> input.addSink(new ElasticsearchSink<>(
>     config, transportAddresses,
>     new ElasticsearchSinkFunction<String>() {...},
>     new ActionRequestFailureHandler() {
>         @Override
>         void onFailure(ActionRequest action,
>                 Throwable failure,
>                 int restStatusCode,
>                 RequestIndexer indexer) throw Throwable {
>             if (ExceptionUtils.findThrowable(failure, 
> EsRejectedExecutionException.class).isPresent()) {
>                 // full queue; re-add document for indexing
>                 indexer.add(action);
>             }
>         }
> }));
> {code}
> I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, 
> it will iterator a list of ActionRequest. However the failure handler will 
> keep re-adding request to that list after bulk, which causes 
> ConcurrentModificationException.
> {code:java}
> void processBufferedRequests(RequestIndexer actualIndexer) {
>    for (ActionRequest request : bufferedRequests) {
>       if (request instanceof IndexRequest) {
>          actualIndexer.add((IndexRequest) request);
>       } else if (request instanceof DeleteRequest) {
>          actualIndexer.add((DeleteRequest) request);
>       } else if (request instanceof UpdateRequest) {
>          actualIndexer.add((UpdateRequest) request);
>       }
>    }
>    bufferedRequests.clear();
> }{code}
> I think it should be a multi-thread bug and is it ok to use concurrent queue 
> to maintain the failure request?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to