Hi Shannon! I was wondering if you still see this issue in Flink 1.1.4?
Just thinking that another possible cause for the issue could be that there is a connection leak somewhere (Flink code or user code or vendor library) and thus the S3 connector's connection pool starves. For Flink 1.2, there is a safetynet that tracks and closes connections that go through Flink's filesystem abstraction. So that should not be an issue there any more. Best, Stephan On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey <sca...@expedia.com> wrote: > Good to know someone else has had the same problem... What did you do > about it? Did it resolve on its own? > > -Shannon > > > > > On 1/12/17, 11:55 AM, "Chen Qin" <qinnc...@gmail.com> wrote: > > >We have seen this issue back to Flink 1.0. Our finding back then was > traffic congestion to AWS in internal network. Many teams too dependent on > S3 and bandwidth is shared, cause traffic congestion from time to time. > > > >Hope it helps! > > > >Thanks > >Chen > > > >> On Jan 12, 2017, at 03:30, Ufuk Celebi <u...@apache.org> wrote: > >> > >> Hey Shannon! > >> > >> Is this always reproducible and how long does it take to reproduce it? > >> > >> I've not seen this error before but as you say it indicates that some > >> streams are not closed. > >> > >> Did the jobs do any restarts before this happened? Flink 1.1.4 > >> contains fixes for more robust releasing of resources in failure > >> scenarios. Is trying 1.1.4 an option? > >> > >> – Ufuk > >> > >>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey <sca...@expedia.com> > wrote: > >>> I'm having pretty frequent issues with the exception below. It > basically > >>> always ends up killing my cluster after forcing a large number of job > >>> restarts. I just can't keep Flink up & running. > >>> > >>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the > >>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, > after > >>> AWS support told me the name of the config option. However, that hasn't > >>> fixed the problem. Assuming that increasing the maxConnections again > doesn't > >>> fix the problem, is there anything else I can do? Is anyone else > having this > >>> problem? Is it possible that the state backend isn't properly calling > >>> close() on its filesystem objects? Or is there a large number of > concurrent > >>> open filesystem objects for some reason? I am using the default > >>> checkpointing settings with one checkpoint at a time, checkpointing > every 10 > >>> minutes. If I am reading the metrics correctly, the checkpoint > duration is > >>> between 12s and 3 minutes on one of the jobs, and 5s or less on the > other 3. > >>> Any help is appreciated. > >>> > >>> java.lang.RuntimeException: Could not initialize state backend. > >>> at > >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup( > AbstractStreamOperator.java:121) > >>> at > >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > setup(AbstractUdfStreamOperator.java:82) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:276) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:212) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:271) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:212) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:271) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:212) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createChainedOperator(OperatorChain.java:271) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain. > createOutputCollector(OperatorChain.java:212) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.< > init>(OperatorChain.java:105) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:225) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > >>> at java.lang.Thread.run(Thread.java:745) > >>> Caused by: > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws. > AmazonClientException: > >>> Unable to execute HTTP request: Timeout waiting for connection from > pool > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.executeHelper(AmazonHttpClient.java:618) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.doExecute(AmazonHttpClient.java:376) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.execute(AmazonHttpClient.java:287) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services. > s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services. > s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call. > GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call. > GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute( > GlobalS3Executor.java:75) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient. > invoke(AmazonS3LiteClient.java:176) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient. > getObjectMetadata(AmazonS3LiteClient.java:94) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite. > getObjectMetadata(AbstractAmazonS3Lite.java:39) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore. > retrieveMetadata(Jets3tNativeFileSystemStore.java:211) > >>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) > >>> at > >>> sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > >>> at java.lang.reflect.Method.invoke(Method.java:498) > >>> at > >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > RetryInvocationHandler.java:191) > >>> at > >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > RetryInvocationHandler.java:102) > >>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus( > S3NativeFileSystem.java:764) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir( > S3NativeFileSystem.java:1169) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs( > S3NativeFileSystem.java:1162) > >>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877) > >>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs( > EmrFileSystem.java:399) > >>> at > >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs( > HadoopFileSystem.java:429) > >>> at > >>> org.apache.flink.runtime.state.filesystem.FsStateBackend. > initializeForJob(FsStateBackend.java:249) > >>> at > >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend. > initializeForJob(RocksDBStateBackend.java:237) > >>> at > >>> org.apache.flink.streaming.runtime.tasks.StreamTask. > createStateBackend(StreamTask.java:718) > >>> at > >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup( > AbstractStreamOperator.java:119) > >>> ... 13 more > >>> Caused by: > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn. > ConnectionPoolTimeoutException: > >>> Timeout waiting for connection from pool > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn. > PoolingClientConnectionManager.leaseConnection( > PoolingClientConnectionManager.java:226) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn. > PoolingClientConnectionManager$1.getConnection( > PoolingClientConnectionManager.java:195) > >>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > >>> at > >>> sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > >>> at java.lang.reflect.Method.invoke(Method.java:498) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn. > ClientConnectionRequestFactory$Handler.invoke( > ClientConnectionRequestFactory.java:70) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > conn.$Proxy36.getConnection(Unknown > >>> Source) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl. > client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl. > client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl. > client.CloseableHttpClient.execute(CloseableHttpClient.java:82) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl. > client.CloseableHttpClient.execute(CloseableHttpClient.java:57) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) > >>> at > >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) > >>> ... 41 more > > >