Hi Shannon!

I was wondering if you still see this issue in Flink 1.1.4?

Just thinking that another possible cause for the issue could be that there
is a connection leak somewhere (Flink code or user code or vendor library)
and thus the S3 connector's connection pool starves.
For Flink 1.2, there is a safetynet that tracks and closes connections that
go through Flink's filesystem abstraction. So that should not be an issue
there any more.

Best,
Stephan



On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey <sca...@expedia.com> wrote:

> Good to know someone else has had the same problem... What did you do
> about it? Did it resolve on its own?
>
> -Shannon
>
>
>
>
> On 1/12/17, 11:55 AM, "Chen Qin" <qinnc...@gmail.com> wrote:
>
> >We have seen this issue back to Flink 1.0. Our finding back then was
> traffic congestion to AWS in internal network. Many teams too dependent on
> S3 and bandwidth is shared, cause traffic congestion from time to time.
> >
> >Hope it helps!
> >
> >Thanks
> >Chen
> >
> >> On Jan 12, 2017, at 03:30, Ufuk Celebi <u...@apache.org> wrote:
> >>
> >> Hey Shannon!
> >>
> >> Is this always reproducible and how long does it take to reproduce it?
> >>
> >> I've not seen this error before but as you say it indicates that some
> >> streams are not closed.
> >>
> >> Did the jobs do any restarts before this happened? Flink 1.1.4
> >> contains fixes for more robust releasing of resources in failure
> >> scenarios. Is trying 1.1.4 an option?
> >>
> >> – Ufuk
> >>
> >>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <sca...@expedia.com>
> wrote:
> >>> I'm having pretty frequent issues with the exception below. It
> basically
> >>> always ends up killing my cluster after forcing a large number of job
> >>> restarts. I just can't keep Flink up & running.
> >>>
> >>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the
> >>> emrfs-site config fs.s3.maxConnections from the default (50) to 75,
> after
> >>> AWS support told me the name of the config option. However, that hasn't
> >>> fixed the problem. Assuming that increasing the maxConnections again
> doesn't
> >>> fix the problem, is there anything else I can do? Is anyone else
> having this
> >>> problem? Is it possible that the state backend isn't properly calling
> >>> close() on its filesystem objects? Or is there a large number of
> concurrent
> >>> open filesystem objects for some reason? I am using the default
> >>> checkpointing settings with one checkpoint at a time, checkpointing
> every 10
> >>> minutes. If I am reading the metrics correctly, the checkpoint
> duration is
> >>> between 12s and 3 minutes on one of the jobs, and 5s or less on the
> other 3.
> >>> Any help is appreciated.
> >>>
> >>> java.lang.RuntimeException: Could not initialize state backend.
> >>> at
> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(
> AbstractStreamOperator.java:121)
> >>> at
> >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> setup(AbstractUdfStreamOperator.java:82)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:276)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:271)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:212)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:105)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:225)
> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> >>> at java.lang.Thread.run(Thread.java:745)
> >>> Caused by:
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.
> AmazonClientException:
> >>> Unable to execute HTTP request: Timeout waiting for connection from
> pool
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.execute(AmazonHttpClient.java:287)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.
> GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.
> GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(
> GlobalS3Executor.java:75)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> invoke(AmazonS3LiteClient.java:176)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> getObjectMetadata(AmazonS3LiteClient.java:94)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.
> getObjectMetadata(AbstractAmazonS3Lite.java:39)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
> >>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
> >>> at
> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>> at
> >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
> >>> at
> >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> >>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(
> S3NativeFileSystem.java:764)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(
> S3NativeFileSystem.java:1169)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(
> S3NativeFileSystem.java:1162)
> >>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
> >>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(
> EmrFileSystem.java:399)
> >>> at
> >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(
> HadoopFileSystem.java:429)
> >>> at
> >>> org.apache.flink.runtime.state.filesystem.FsStateBackend.
> initializeForJob(FsStateBackend.java:249)
> >>> at
> >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> initializeForJob(RocksDBStateBackend.java:237)
> >>> at
> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.
> createStateBackend(StreamTask.java:718)
> >>> at
> >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(
> AbstractStreamOperator.java:119)
> >>> ... 13 more
> >>> Caused by:
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.
> ConnectionPoolTimeoutException:
> >>> Timeout waiting for connection from pool
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingClientConnectionManager.leaseConnection(
> PoolingClientConnectionManager.java:226)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingClientConnectionManager$1.getConnection(
> PoolingClientConnectionManager.java:195)
> >>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> >>> at
> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.
> ClientConnectionRequestFactory$Handler.invoke(
> ClientConnectionRequestFactory.java:70)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> conn.$Proxy36.getConnection(Unknown
> >>> Source)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.
> client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.
> client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.
> client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.
> client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
> >>> at
> >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
> >>> ... 41 more
> >
>

Reply via email to