Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <zzfu...@gmail.com> wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 6002 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 6004 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6007 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 6008 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6009 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6010 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6011 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6012 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6013 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6014 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6015 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6016 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
>
> 6030 2021-06-05 05:31:31,633 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: Connection closed unexpectedly
> 6031 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException:
>  Connection closed unexpectedly
> 6032 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146)
>  [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1]
> 6033 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6034 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6035 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100)
>  [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1]
> 6036 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6037 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449)
>  [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6038 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6039 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6040 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>  [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
>
> On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <zzfu...@gmail.com> wrote:
>>
>> Hi team,
>>
>> We encountered an issue about ES sink connector timeout quite frequently. As 
>> checked the ES cluster is far from being loaded(~40% CPU utilization, no 
>> query, index rate is also low). We're using ES-7 connector, with 12 data 
>> nodes and parallelism of 32.
>>
>> The error log is as below, we want to know if there is any way to locate the 
>> issue or configure the timeout parameter.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>>
>> 2021-06-05 11:49:10
>> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
>>     at 
>> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
>>     at 
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>     at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on 
>> connection http-outgoing-21 [ACTIVE]
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>>     ... 1 more
>>
>> Config:
>> WITH (
>>     'connector' = 'elasticsearch-7',
>>     'hosts' = 'https://xxx:443',
>>     'index' = 'xxx',
>>     'sink.bulk-flush.max-actions' = '10000',
>>     'sink.bulk-flush.max-size' = '2mb',
>>     'sink.flush-on-checkpoint' = 'true',
>>     'connection.max-retry-timeout' = '60s',
>>     'failure-handler' = 'retry-rejected',
>>     'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
>>     'sink.bulk-flush.interval' = '2s'
>> );
>>
>> --
>> Best wishes,
>> - Kai
>
>
>
> --
> Best wishes,
> - Kai

Reply via email to