Hi, Kai, I think the exception should be thrown from RetryRejectedExecutionFailureHandler as you configure the 'failure-handler' to 'retry-rejected'. It will retry the action that failed with EsRejectedExecutionException and throw all other failures.
AFAIK, there is no way to configure the connection/socket timeout in Elasticsearch SQL connector. However, if the root cause is a network jitter, you may increase the sink.bulk-flush.backoff.delay and the sink.bulk-flush.backoff.max-retries. Best, Yangze Guo On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <zzfu...@gmail.com> wrote: > > With some investigation in the task manager's log, the exception was raised > from RetryRejectedExecutionFailureHandler path, the related logs are showing > below, not sure why it's that. > > > 5978 2021-06-05 05:31:31,529 INFO > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler > [] - Bulk request 1033 has been cancelled. > 5979 java.lang.InterruptedException: null > 5980 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > ~[?:1.8.0_272] > 5981 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > ~[?:1.8.0_272] > 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > ~[?:1.8.0_272] > 5983 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 5984 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5985 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5986 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 5987 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5988 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 5989 at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5990 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5991 at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 5992 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5993 at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5994 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5995 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > 5999 2021-06-05 05:31:31,530 ERROR > org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler > [] - Failed Elasticsearch item request: null > 6000 java.lang.InterruptedException: null > 6001 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > ~[?:1.8.0_272] > 6002 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > ~[?:1.8.0_272] > 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > ~[?:1.8.0_272] > 6004 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6005 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6006 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6007 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1] > 6008 at > org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6009 at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354) > ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6010 at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6011 at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6012 at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > 6013 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 6014 at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 6015 at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 6016 at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > [flink-dist_2.11-1.13.1.jar:1.13.1] > 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > [flink-dist_2.11-1.13.1.jar:1.13.1] > > 6030 2021-06-05 05:31:31,633 ERROR > org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler > [] - Failed Elasticsearch item request: Connection closed unexpectedly > 6031 > org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException: > Connection closed unexpectedly > 6032 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) > [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1] > 6033 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) > [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6034 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) > [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6035 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) > [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1] > 6036 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) > [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1] > 6037 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) > [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6038 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283) > [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1] > 6039 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) > [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6040 at > org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) > [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1] > 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272] > > On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <zzfu...@gmail.com> wrote: >> >> Hi team, >> >> We encountered an issue about ES sink connector timeout quite frequently. As >> checked the ES cluster is far from being loaded(~40% CPU utilization, no >> query, index rate is also low). We're using ES-7 connector, with 12 data >> nodes and parallelism of 32. >> >> The error log is as below, we want to know if there is any way to locate the >> issue or configure the timeout parameter. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ >> >> 2021-06-05 11:49:10 >> java.lang.RuntimeException: An error occurred in ElasticsearchSink. >> at >> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427) >> at >> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432) >> at >> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329) >> at >> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) >> at >> org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112) >> at >> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80) >> at >> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on >> connection http-outgoing-21 [ACTIVE] >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) >> at >> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) >> ... 1 more >> >> Config: >> WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = 'https://xxx:443', >> 'index' = 'xxx', >> 'sink.bulk-flush.max-actions' = '10000', >> 'sink.bulk-flush.max-size' = '2mb', >> 'sink.flush-on-checkpoint' = 'true', >> 'connection.max-retry-timeout' = '60s', >> 'failure-handler' = 'retry-rejected', >> 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL', >> 'sink.bulk-flush.interval' = '2s' >> ); >> >> -- >> Best wishes, >> - Kai > > > > -- > Best wishes, > - Kai