[ https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-14938: ----------------------------------- Labels: pull-request-available (was: ) > Flink elasticsearch failure handler re-add indexrequest causes > ConcurrentModificationException > ---------------------------------------------------------------------------------------------- > > Key: FLINK-14938 > URL: https://issues.apache.org/jira/browse/FLINK-14938 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Affects Versions: 1.8.1 > Reporter: Shengnan YU > Assignee: Shengnan YU > Priority: Major > Labels: pull-request-available > > > When use Elasticsearch connector failure handler (from official example) to > re-add documents, Flink encountered ConcurrentModificationException. > {code:java} > input.addSink(new ElasticsearchSink<>( > config, transportAddresses, > new ElasticsearchSinkFunction<String>() {...}, > new ActionRequestFailureHandler() { > @Override > void onFailure(ActionRequest action, > Throwable failure, > int restStatusCode, > RequestIndexer indexer) throw Throwable { > if (ExceptionUtils.findThrowable(failure, > EsRejectedExecutionException.class).isPresent()) { > // full queue; re-add document for indexing > indexer.add(action); > } > } > })); > {code} > I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, > it will iterator a list of ActionRequest. However the failure handler will > keep re-adding request to that list after bulk, which causes > ConcurrentModificationException. > {code:java} > void processBufferedRequests(RequestIndexer actualIndexer) { > for (ActionRequest request : bufferedRequests) { > if (request instanceof IndexRequest) { > actualIndexer.add((IndexRequest) request); > } else if (request instanceof DeleteRequest) { > actualIndexer.add((DeleteRequest) request); > } else if (request instanceof UpdateRequest) { > actualIndexer.add((UpdateRequest) request); > } > } > bufferedRequests.clear(); > }{code} > I think it should be a multi-thread bug and is it ok to use concurrent queue > to maintain the failure request? > -- This message was sent by Atlassian Jira (v8.3.4#803005)