[ https://issues.apache.org/jira/browse/FLINK-20641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Echo Lee closed FLINK-20641. ---------------------------- Resolution: Not A Bug > flink-connector-elasticsearch6 will deadlock > -------------------------------------------- > > Key: FLINK-20641 > URL: https://issues.apache.org/jira/browse/FLINK-20641 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch > Affects Versions: 1.11.1 > Reporter: Echo Lee > Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0, 1.14.3 > > Attachments: jstack > > > flink version: 1.11.1 > elasticsearch connector version: 6.3.1 > My job graph is [kafkaSource--> map–>elasticsearchSink], when I set a larger > degree of parallelism, stream processing will stop, I know es has an issue > [47599|https://github.com/elastic/elasticsearch/issues/47599], this is > unexpectedly the risk of deadlock when using flink-connector-elasticsearch6. > > TaskManager stack is: > [link title|http://example.com/][^jstack] > > TaskManager log is: > {code:java} > 2020-12-16 14:36:35,291 ERROR xxx.ActionRequestFailureHandler [] - Sink > to es exception ,exceptionData: index {[full_link_apm_span-2020- > 12-16][apm][null], source[n/a, actual length: [5.8kb], max length: 2kb]} > ,exceptionStackTrace: java.lang.InterruptedException > 68224 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > 68225 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > 68226 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > 68227 at > org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86) > 68228 at > org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339) > 68229 at > org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330) > 68230 at > org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288) > 68231 at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271) > 68232 at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267) > 68233 at > org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253) > 68234 at > org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:72) > 68235 at > com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:59) > 68236 at > com.hundsun.flink.util.ElasticSearchSinkUtil$1.process(ElasticSearchSinkUtil.java:47) > 68237 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:310) > 68238 at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > 68239 at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > 68240 at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > 68241 at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > 68242 at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > 68243 at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > 68244 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > 68245 at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > 68246 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > 68247 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > 68248 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > 68249 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)