Hi, ren I think the root cause is you didn’t set proper FailureHandler for ElasticSearch connector, the `RetryRejectedExecutionFailureHandler` can resolve your issue, you can see ElasticSearch connector docs[1] for more information. You can also set 'connector.failure-handler to 'retry-rejected’ in your Elasticsearch table DDL if you’re using Flink SQL rather than DataStream Application.
Btw, please use English in user@flink.apache.org or send Chinese email to user...@flink.apache.org for better communication. Best, Leonard [1]https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html > 2021年12月3日 下午2:26,淘宝龙安 <rentb...@gmail.com> 写道: > > hi, all > > 我遇到了一个非常难解决的问题,我的场景是一个非常简单和常见的场景,从kafka消费数据,然后写入es,但是当es的集群负载较高,发生写拒绝的时候(es_rejected_execution_exception),整个flink任务就会hang住,不再消费数据,也不重启,所有的checkpoint都会失败,我找到一些线索,但是始终找不到问题在哪里。 > > 现象: > 发生写入拒绝时候,flink报错的日志(TaskManager), 在这之后,整个job就会卡死,不再消费数据,无论es集群是否正常。 > > 2021-10-11 08:07:28,804 I/O dispatcher 6 ERROR > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase - > Failed Elasticsearch item request: ElasticsearchException[Elasticsearch > exception [type=es_rejected_execution_exception, reason=rejected execution of > processing of [1477079498][indices:data/write/bulk[s][p]]: request: > BulkShardRequest [[acs_period_stl_index_2021_03_30_19_06_21][0]] containing > [100] requests, target allocation id: rnOmk_VtSgGAOT8e0dCefQ, primary term: 1 > on EsThreadPoolExecutor[name = VECS014179/write, queue capacity = 200, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@640a33fe[Running, > pool size = 8, active threads = 8, queued tasks = 226, completed tasks = > 1171462808]]]] > ElasticsearchException[Elasticsearch exception > [type=es_rejected_execution_exception, reason=rejected execution of > processing of [1477079498][indices:data/write/bulk[s][p]]: request: > BulkShardRequest [[acs_period_stl_index_2021_03_30_19_06_21][0]] containing > [100] requests, target allocation id: rnOmk_VtSgGAOT8e0dCefQ, primary term: 1 > on EsThreadPoolExecutor[name = VECS014179/write, queue capacity = 200, > org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@640a33fe[Running, > pool size = 8, active threads = 8, queued tasks = 226, completed tasks = > 1171462808]]]] > at > org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:491) > at > org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:402) > at > org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:139) > at > org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:199) > at > org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727) > at > org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1520) > at > org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1598) > at > org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:556) > at org.elasticsearch.client.RestClient$1.completed(RestClient.java:300) > at org.elasticsearch.client.RestClient$1.completed(RestClient.java:294) > at > com.huster.hidden.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122) > at > com.huster.hidden.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181) > at > com.huster.hidden.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448) > at > com.huster.hidden.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338) > at > com.huster.hidden.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) > at > com.huster.hidden.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) > at > com.huster.hidden.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > at > com.huster.hidden.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) > at java.lang.Thread.run(Thread.java:748) > > > checkpoint的配置 : > <image.png> > > checkpoint全部失败: > > <image.png> > > > > 我使用的是flink 1.10.0 ,es的connector的版本: flink-connector-elasticsearch7_2.11, > v1.10.0, es的集群是7.5.2或者7.15.0都有遇到过,而且不止我的代码,其他人也遇到过相同的问题 > > 当部署到flink的yarn集群运行的时候,百分之百能够复现,但是到我本地IEDA里调试的时候,偶尔会发生又不是百分百能复现: > > 当时卡死时候的jstack再附件里, 最后一次死掉的时候的debug日志: > > 2021-12-02 22:06:55.627 [Checkpoint Timer] INFO > o.a.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 20 > @ 1638454015627 for job d5a71c0aca220cf880f24bbac4fc7c8f. > 2021-12-02 22:06:55.628 [flink-akka.actor.default-dispatcher-62] DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint > 20@1638454015627 for e2fa126a099ade3e581253235ffdc797. > 2021-12-02 22:06:59.946 [Flink-DispatcherRestEndpoint-thread-2] DEBUG > o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Start fetching metrics. > 2021-12-02 22:06:59.947 [Flink-DispatcherRestEndpoint-thread-4] DEBUG > o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Retrieve metric query > service gateway for > akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService > <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService> > 2021-12-02 22:06:59.947 [Flink-DispatcherRestEndpoint-thread-4] DEBUG > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote > RPC endpoint with address > akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService > <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. > Returning a > org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway > gateway. > 2021-12-02 22:06:59.951 [Flink-DispatcherRestEndpoint-thread-3] DEBUG > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Try to connect to remote > RPC endpoint with address > akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService > <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. > Returning a > org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway > gateway. > 2021-12-02 22:06:59.953 [Flink-DispatcherRestEndpoint-thread-2] DEBUG > o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Query metrics for > akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService > <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. > 2021-12-02 22:06:59.953 [Flink-DispatcherRestEndpoint-thread-4] DEBUG > o.a.f.r.r.handler.legacy.metrics.MetricFetcherImpl - Query metrics for > akka.tcp://flink-metrics@172.19.179.143:58419/user/MetricQueryService > <http://flink-metrics@172.19.179.143:58419/user/MetricQueryService>. > 2021-12-02 22:07:00.113 [flink-rest-server-netty-worker-thread-14] DEBUG > o.a.f.r.r.h.legacy.files.StaticFileServerHandler - Responding 'NOT MODIFIED' > for file > '/private/var/folders/n4/sj54y_5d1cj1wd0h_kgz1z4r0000gn/T/flink-web-ui/index.html' > 2021-12-02 22:07:04.145 [flink-akka.actor.default-dispatcher-60] DEBUG > o.a.f.r.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. > 2021-12-02 22:07:04.149 [flink-akka.actor.default-dispatcher-68] DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat > request from 2859ebe1eeb6acc39673028ca5f90b2b. > 2021-12-02 22:07:04.151 [flink-akka.actor.default-dispatcher-60] DEBUG > o.a.f.r.resourcemanager.StandaloneResourceManager - Received heartbeat from > b2cbed7e-3290-4bd3-b24e-2ad1cf07284d. > 2021-12-02 22:07:04.151 [flink-akka.actor.default-dispatcher-60] DEBUG > o.a.f.r.r.slotmanager.SlotManagerImpl - Received slot report from instance > 6b5347bb15142ab9c92fb761ff8d9315: > SlotReport{slotsStatus=[SlotStatus{slotID=b2cbed7e-3290-4bd3-b24e-2ad1cf07284d_0, > resourceProfile=ResourceProfile{managedMemory=128.000mb (134217728 bytes), > networkMemory=64.000mb (67108864 bytes)}, > allocationID=2a311c55f742a7a398260555f9d5c775, > jobID=d5a71c0aca220cf880f24bbac4fc7c8f}]}. > 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-60] DEBUG > o.a.f.r.resourcemanager.StandaloneResourceManager - Trigger heartbeat request. > 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-60] DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Received heartbeat request > from 2859ebe1eeb6acc39673028ca5f90b2b. > 2021-12-02 22:07:04.225 [flink-akka.actor.default-dispatcher-68] DEBUG > o.a.f.r.resourcemanager.StandaloneResourceManager - Received heartbeat from > 1d01cc7299b415750d4e3ca23134e760. > 2021-12-02 22:07:04.734 [flink-akka.actor.default-dispatcher-60] DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request. > 2021-12-02 22:07:04.735 [flink-akka.actor.default-dispatcher-68] DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat > request from 1d01cc7299b415750d4e3ca23134e760. > 2021-12-02 22:07:04.738 [flink-akka.actor.default-dispatcher-68] DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Received heartbeat from > b2cbed7e-3290-4bd3-b24e-2ad1cf07284d. > 2021-12-02 22:07:13.386 [flink-akka.actor.default-dispatcher-67] DEBUG > o.a.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Slot Pool Status: > status: connected to akka://flink/user/resourcemanager > registered TaskManagers: [b2cbed7e-3290-4bd3-b24e-2ad1cf07284d] > available slots: [] > allocated slots: [[AllocatedSlot 2a311c55f742a7a398260555f9d5c775 @ > b2cbed7e-3290-4bd3-b24e-2ad1cf07284d @ localhost (dataPort=-1) - 0]] > pending requests: [] > > > 可以看到,正常情况下,应该是这样的, 而hang死的时候,就打了2021-12-02 22:06:55.628 > [flink-akka.actor.default-dispatcher-62] DEBUG > org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint > 20@1638454015627 for e2fa126a099ade3e581253235ffdc797. 这个日志后,就再也没有其他日志了. > <image.png> > > > > 目前无法百分百复现,另外就是checkpoint那块的代码因为使用了akka的原因,非常难跟踪调试,给寻找问题带来了不小的困难,谁对这块比较熟,帮忙分析分析,到底程序卡死在什么地方了? > 或者你们遇到过相同的问题吗? > <jstack>