[ https://issues.apache.org/jira/browse/FLINK-14938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093058#comment-17093058 ]
Hwanju Kim commented on FLINK-14938: ------------------------------------ [~ysn2233] - I have a question on your proposed solution. I wonder if you've gotten any performance measurement, which would've led you to that hybrid solution. IMO, since this ElasticSearch sink path is involved in I/O to external services, any CPU penalty incurred by concurrency-aware data structure may be dwarfed or invisible by much high I/O latency, so using CurrentLinkedQueue seems to be just fine to me (I mean in terms of latency, but it could be some additional CPU cost, which I am also not sure about its significance quantitatively). We are also looking to the resolution of this problem, but wanted to check if you have performance test to confirm whether such cost matters. > Flink elasticsearch failure handler re-add indexrequest causes > ConcurrentModificationException > ---------------------------------------------------------------------------------------------- > > Key: FLINK-14938 > URL: https://issues.apache.org/jira/browse/FLINK-14938 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Affects Versions: 1.8.1 > Reporter: Shengnan YU > Assignee: Shengnan YU > Priority: Major > > > When use Elasticsearch connector failure handler (from official example) to > re-add documents, Flink encountered ConcurrentModificationException. > {code:java} > input.addSink(new ElasticsearchSink<>( > config, transportAddresses, > new ElasticsearchSinkFunction<String>() {...}, > new ActionRequestFailureHandler() { > @Override > void onFailure(ActionRequest action, > Throwable failure, > int restStatusCode, > RequestIndexer indexer) throw Throwable { > if (ExceptionUtils.findThrowable(failure, > EsRejectedExecutionException.class).isPresent()) { > // full queue; re-add document for indexing > indexer.add(action); > } > } > })); > {code} > I found that in method BufferingNoOpRequestIndexer$processBufferedRequests, > it will iterator a list of ActionRequest. However the failure handler will > keep re-adding request to that list after bulk, which causes > ConcurrentModificationException. > {code:java} > void processBufferedRequests(RequestIndexer actualIndexer) { > for (ActionRequest request : bufferedRequests) { > if (request instanceof IndexRequest) { > actualIndexer.add((IndexRequest) request); > } else if (request instanceof DeleteRequest) { > actualIndexer.add((DeleteRequest) request); > } else if (request instanceof UpdateRequest) { > actualIndexer.add((UpdateRequest) request); > } > } > bufferedRequests.clear(); > }{code} > I think it should be a multi-thread bug and is it ok to use concurrent queue > to maintain the failure request? > -- This message was sent by Atlassian Jira (v8.3.4#803005)