Hi, I have a simple Flink job which is reading the data from Kafka topic and generating minute aggregations and writing them to Elastic Search.
I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job runs for an hour fine and then it is getting stopped and when i checked the logs i am seeing the following. Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to create Channel from class class org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel at org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46) at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143) at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127) at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272) at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214) at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44) ... 17 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Failed to open a socket. at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80) ... 21 more Caused by: java.net.SocketException: Too many open files at sun.nio.ch.Net.socket0(Native Method) at sun.nio.ch.Net.socket(Net.java:411) at sun.nio.ch.Net.socket(Net.java:404) at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102) at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68) ... 23 more Along with calling the Sink Function where i am writing the data to elastic i am calling the print() on SingleOutputStreamOperator (the Stream that is returned once i calculate the Aggregation based on Tumbling Window. And also i am calling the DataStreamUtils.collect() on the above stream to log out the info in the stream. These two are only enabled for in DEV Environment. I have updated the limits.conf and also set the value of file-max (fs.file-max = 2097152) on the master node as well as on all worker nodes and still getting the same issue. Thanks Sateesh -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/