Hi,

  I have a simple Flink job which is reading the data from Kafka topic and
generating minute aggregations and writing them to Elastic Search.

  I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
runs for an hour fine and then it is getting stopped and when i checked the
logs i am seeing the following.

 Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
create Channel from class class
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
        at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
        at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
        at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
        at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
        at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
        at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
        at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown 
Source)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at
org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
        ... 17 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
Failed to open a socket.
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
        ... 21 more
Caused by: java.net.SocketException: Too many open files
        at sun.nio.ch.Net.socket0(Native Method)
        at sun.nio.ch.Net.socket(Net.java:411)
        at sun.nio.ch.Net.socket(Net.java:404)
        at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
        at
sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
        at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
        ... 23 more

Along with calling the Sink Function where i am writing the data to elastic
i am calling the print() on SingleOutputStreamOperator (the Stream that is
returned once i calculate the Aggregation based on Tumbling Window.

And also i am calling the DataStreamUtils.collect() on the above stream to
log out the info in the stream.

These two are only enabled for in DEV Environment.

I have updated the limits.conf and also set the value of file-max
(fs.file-max = 2097152) on the master node as well as on all worker nodes
and still getting the same issue.

Thanks
Sateesh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to