I haven't seen it yet, I'll let you know if I do. My last whole-cluster failure seems to have been caused by placing too much load on the cluster. We had a job that got up to 12GB in checkpoint size. Current cluster is 6x c3.2xlarge. The logs show a lot of "java.net.SocketException: Connection reset" when trying to write checkpoints to S3, as well as repeated disconnect/reconnect with Zookeeper "Client session timed out, have not heard from server in 28301ms for sessionid 0x254bb682e214f79, closing socket connection and attempting reconnect", and things like "akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.0.88.37:38768/user/taskmanager#-497097074]] after [10000 ms]". Generally, it seems as if the network got overwhelmed.
-Shannon From: Stephan Ewen <se...@apache.org<mailto:se...@apache.org>> Date: Tuesday, January 24, 2017 at 8:30 AM To: <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink snapshotting to S3 - Timeout waiting for connection from pool Hi Shannon! I was wondering if you still see this issue in Flink 1.1.4? Just thinking that another possible cause for the issue could be that there is a connection leak somewhere (Flink code or user code or vendor library) and thus the S3 connector's connection pool starves. For Flink 1.2, there is a safetynet that tracks and closes connections that go through Flink's filesystem abstraction. So that should not be an issue there any more. Best, Stephan On Fri, Jan 13, 2017 at 1:04 AM, Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>> wrote: Good to know someone else has had the same problem... What did you do about it? Did it resolve on its own? -Shannon On 1/12/17, 11:55 AM, "Chen Qin" <qinnc...@gmail.com<mailto:qinnc...@gmail.com>> wrote: >We have seen this issue back to Flink 1.0. Our finding back then was traffic >congestion to AWS in internal network. Many teams too dependent on S3 and >bandwidth is shared, cause traffic congestion from time to time. > >Hope it helps! > >Thanks >Chen > >> On Jan 12, 2017, at 03:30, Ufuk Celebi >> <u...@apache.org<mailto:u...@apache.org>> wrote: >> >> Hey Shannon! >> >> Is this always reproducible and how long does it take to reproduce it? >> >> I've not seen this error before but as you say it indicates that some >> streams are not closed. >> >> Did the jobs do any restarts before this happened? Flink 1.1.4 >> contains fixes for more robust releasing of resources in failure >> scenarios. Is trying 1.1.4 an option? >> >> – Ufuk >> >>> On Thu, Jan 12, 2017 at 1:18 AM, Shannon Carey >>> <sca...@expedia.com<mailto:sca...@expedia.com>> wrote: >>> I'm having pretty frequent issues with the exception below. It basically >>> always ends up killing my cluster after forcing a large number of job >>> restarts. I just can't keep Flink up & running. >>> >>> I am running Flink 1.1.3 on EMR 5.2.0. I already tried updating the >>> emrfs-site config fs.s3.maxConnections from the default (50) to 75, after >>> AWS support told me the name of the config option. However, that hasn't >>> fixed the problem. Assuming that increasing the maxConnections again doesn't >>> fix the problem, is there anything else I can do? Is anyone else having this >>> problem? Is it possible that the state backend isn't properly calling >>> close() on its filesystem objects? Or is there a large number of concurrent >>> open filesystem objects for some reason? I am using the default >>> checkpointing settings with one checkpoint at a time, checkpointing every 10 >>> minutes. If I am reading the metrics correctly, the checkpoint duration is >>> between 12s and 3 minutes on one of the jobs, and 5s or less on the other 3. >>> Any help is appreciated. >>> >>> java.lang.RuntimeException: Could not initialize state backend. >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:121) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:276) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:271) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:212) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:105) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: >>> Unable to execute HTTP request: Timeout waiting for connection from pool >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:7) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211) >>> at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) >>> at >>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>> at com.sun.proxy.$Proxy34.retrieveMetadata(Unknown Source) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:764) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdir(S3NativeFileSystem.java:1169) >>> at >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.mkdirs(S3NativeFileSystem.java:1162) >>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877) >>> at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.mkdirs(EmrFileSystem.java:399) >>> at >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:429) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend.initializeForJob(FsStateBackend.java:249) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.initializeForJob(RocksDBStateBackend.java:237) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:718) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:119) >>> ... 13 more >>> Caused by: >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: >>> Timeout waiting for connection from pool >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195) >>> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.getConnection(Unknown >>> Source) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) >>> at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) >>> ... 41 more >