In flink-es connector 6.*, you can set the socket timeout by implementing a 
customized RestClientFactory。 Here is the code snippet.

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) 
{
        restClientBuilder
                .setRequestConfigCallback(new 
ElasticSearchRequestConfigCallback())
    }

    class ElasticSearchRequestConfigCallback implements 
RestClientBuilder.RequestConfigCallback {
        @Override
        public RequestConfig.Builder 
customizeRequestConfig(RequestConfig.Builder builder) {
            return builder.setSocketTimeout(requestTimeout);
        }
    }


The default socket timeout is 30 seconds which should be ok for most cases. So 
if you find that the es load is normal, but the response is slow or the number 
of rejected request is high, probably you should check
1) if the concurrent count of es bulk requests is too much?
2) if the bulk size is too big?
3) if too many indexes are included in one bulk request?


BR,
Jacky
________________________________
发件人: Yangze Guo <karma...@gmail.com>
发送时间: 2021年6月7日 11:41
收件人: Kai Fu <zzfu...@gmail.com>
抄送: user <user@flink.apache.org>
主题: Re: Elasticsearch sink connector timeout

Hi, Kai,

I think the exception should be thrown from
RetryRejectedExecutionFailureHandler as you configure the
'failure-handler' to 'retry-rejected'. It will retry the action that
failed with EsRejectedExecutionException and throw all other failures.

AFAIK, there is no way to configure the connection/socket timeout in
Elasticsearch SQL connector. However, if the root cause is a network
jitter, you may increase the sink.bulk-flush.backoff.delay and the
sink.bulk-flush.backoff.max-retries.


Best,
Yangze Guo

On Sat, Jun 5, 2021 at 2:28 PM Kai Fu <zzfu...@gmail.com> wrote:
>
> With some investigation in the task manager's log, the exception was raised 
> from RetryRejectedExecutionFailureHandler path, the related logs are showing 
> below, not sure why it's that.
>
>
> 5978 2021-06-05 05:31:31,529 INFO 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler
>  [] - Bulk request 1033 has been cancelled.
> 5979 java.lang.InterruptedException: null
> 5980 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 5981 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 5982 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 5983 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 5984 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5985 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5986 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 5987 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5988 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 5989 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5990 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5991 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 5992 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5993 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5994 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5995 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5996 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5997 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> 5998 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
> 5999 2021-06-05 05:31:31,530 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: null
> 6000 java.lang.InterruptedException: null
> 6001 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>  ~[?:1.8.0_272]
> 6002 at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  ~[?:1.8.0_272]
> 6003 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) 
> ~[?:1.8.0_272]
> 6004 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:78)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6005 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:455)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6006 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:464)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6007 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.awaitClose(BulkProcessor.java:330)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13. 1]
> 6008 at 
> org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.bulk.BulkProcessor.close(BulkProcessor.java:300)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6009 at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:354)
>  ~[flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6010 at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6011 at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6012 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:861)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
> 6013 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:840)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6014 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:753)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6015 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:659)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6016 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  [flink-dist_2.11-1.13.1.jar:1.13.1]
> 6017 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> [flink-dist_2.11-1.13.1.jar:1.13.1]
>
> 6030 2021-06-05 05:31:31,633 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
>  [] - Failed Elasticsearch item request: Connection closed unexpectedly
> 6031 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.ConnectionClosedException:
>  Connection closed unexpectedly
> 6032 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146)
>  [flink-sql-connector-elasticsearch7_2.11- 1.13.1.jar:1.13.1]
> 6033 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6034 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6035 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100)
>  [flink-sql-connector-elasticsearch7_2.11-1. 13.1.jar:1.13.1]
> 6036 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar: 1.13.1]
> 6037 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449)
>  [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6038 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:283)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1. jar:1.13.1]
> 6039 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>  [flink-sql-connector-elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6040 at 
> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>  [flink-sql-connector- elasticsearch7_2.11-1.13.1.jar:1.13.1]
> 6041 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
>
> On Sat, Jun 5, 2021 at 12:13 PM Kai Fu <zzfu...@gmail.com> wrote:
>>
>> Hi team,
>>
>> We encountered an issue about ES sink connector timeout quite frequently. As 
>> checked the ES cluster is far from being loaded(~40% CPU utilization, no 
>> query, index rate is also low). We're using ES-7 connector, with 12 data 
>> nodes and parallelism of 32.
>>
>> The error log is as below, we want to know if there is any way to locate the 
>> issue or configure the timeout parameter.
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
>>
>> 2021-06-05 11:49:10
>> java.lang.RuntimeException: An error occurred in ElasticsearchSink.
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:427)
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:432)
>>     at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:329)
>>     at 
>> org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>     at 
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>     at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog(DeduplicateFunctionHelper.java:112)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:80)
>>     at 
>> org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepLastRowFunction.processElement(ProcTimeDeduplicateKeepLastRowFunction.java:32)
>>     at 
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>     at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>     at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on 
>> connection http-outgoing-21 [ACTIVE]
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>     at 
>> org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
>>     ... 1 more
>>
>> Config:
>> WITH (
>>     'connector' = 'elasticsearch-7',
>>     'hosts' = 'https://xxx:443',
>>     'index' = 'xxx',
>>     'sink.bulk-flush.max-actions' = '10000',
>>     'sink.bulk-flush.max-size' = '2mb',
>>     'sink.flush-on-checkpoint' = 'true',
>>     'connection.max-retry-timeout' = '60s',
>>     'failure-handler' = 'retry-rejected',
>>     'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL',
>>     'sink.bulk-flush.interval' = '2s'
>> );
>>
>> --
>> Best wishes,
>> - Kai
>
>
>
> --
> Best wishes,
> - Kai

Reply via email to