Hi Mars,

A few questions..

1. What version of Flink are you using?

2. Are you using the default ES sink, or did you write your own?

3. What class of EC2 slave are you using?

4. What’s the parallelism of the ES sink?

5. To verify the actual open file limit, you need to…

 * scp your private key to the EMR master
 * ssh onto the EMR master
 * ssh (using the same key) from the master to one of the EMR slaves
 * sudo -u yarn bash -c ‘ulimit -a’

For many classes of EC2 servers, you get 32K max open files.

I haven’t looked at exactly how the ES sink configures things, but the ES REST 
client (by default) has a connection pool of size 30, and these connections 
aren’t closed immediately.

Each connection uses 3 file descriptors (one a_inode, two FIFO). So you could 
get about 100 open files per sink sub-task.

— Ken

PS - calling collect() or print() on a data stream would only make sense if it 
was tiny. Can you use PrintSinkFunction()?

> On Sep 25, 2020, at 2:24 PM, mars <sk_ac...@yahoo.com> wrote:
> 
> Hi,
> 
>  I have a simple Flink job which is reading the data from Kafka topic and
> generating minute aggregations and writing them to Elastic Search.
> 
>  I am running the Flink Job (Flink Yarn Session) on EMR Cluster and the Job
> runs for an hour fine and then it is getting stopped and when i checked the
> logs i am seeing the following.
> 
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelException: Unable to
> create Channel from class class
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
>       at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:309)
>       at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
>       at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:143)
>       at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:127)
>       at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:333)
>       at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:272)
>       at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:214)
>       at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$22(RestClusterClient.java:629)
>       at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>       at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>       at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>       at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
>       at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       ... 1 more
> Caused by: java.lang.reflect.InvocationTargetException
>       at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown 
> Source)
>       at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
>       ... 17 more
> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.ChannelException:
> Failed to open a socket.
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:70)
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:87)
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:80)
>       ... 21 more
> Caused by: java.net.SocketException: Too many open files
>       at sun.nio.ch.Net.socket0(Native Method)
>       at sun.nio.ch.Net.socket(Net.java:411)
>       at sun.nio.ch.Net.socket(Net.java:404)
>       at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:102)
>       at
> sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60)
>       at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:68)
>       ... 23 more
> 
> Along with calling the Sink Function where i am writing the data to elastic
> i am calling the print() on SingleOutputStreamOperator (the Stream that is
> returned once i calculate the Aggregation based on Tumbling Window.
> 
> And also i am calling the DataStreamUtils.collect() on the above stream to
> log out the info in the stream.
> 
> These two are only enabled for in DEV Environment.
> 
> I have updated the limits.conf and also set the value of file-max
> (fs.file-max = 2097152) on the master node as well as on all worker nodes
> and still getting the same issue.
> 
> Thanks
> Sateesh
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to