That solved my problem, Thank you! Best, Yassine
2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>: > Hi! > > Looks to me that this is the following problem: The Decompression Streams > did not properly forward the "close()" calls. > > It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3. > The fix is in that pull request: https://github.com/apache/flink/pull/2581 > > I have pushed the fix into the latest 1.1-SNAPSHOT branch. > > If you get the code via "git clone -b release-1.1 > https://github.com/apache/flink.git" you will get the code that is the > same as the 1.1.3 release, plus the patch to this problem. > > Greetings, > Stephan > > > On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > >> Hi all, >> >> I'm reading a large number of small files from HDFS in batch mode (about >> 20 directories, each directory contains about 3000 files, using >> recursive.file.enumeration=true), and each time, at about 200 GB of >> received data, my job fails with the following exception: >> >> java.io.IOException: Error opening the Input Split >> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block: >> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 >> file=/filepath/filename.csv.gz >> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >> tFormat.java:693) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:424) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:47) >> at org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:140) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Unknown Source) >> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not >> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 >> file=/filepath/filename.csv.gz >> at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu >> tStream.java:984) >> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >> ream.java:642) >> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >> putStream.java:882) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream. >> java:934) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream. >> java:735) >> at java.io.FilterInputStream.read(Unknown Source) >> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >> HadoopDataInputStream.java:59) >> at java.util.zip.CheckedInputStream.read(Unknown Source) >> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >> Stream(FileInputFormat.java:717) >> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >> tFormat.java:689) >> ... 5 more >> >> I checked the file each time and it exists and is healthy. Looking at the >> taskmanager logs, I found the following exceptions which suggests it is >> running out of connections: >> >> 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.BlockReaderFactory >> - I/O error constructing remote block reader. >> java.net.SocketException: No buffer space available (maximum connections >> reached?): connect >> at sun.nio.ch.Net.connect0(Native Method) >> at sun.nio.ch.Net.connect(Unknown Source) >> at sun.nio.ch.Net.connect(Unknown Source) >> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) >> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi >> thTimeout.java:192) >> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) >> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) >> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR >> eaderFactory.java:777) >> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead >> erFromTcp(BlockReaderFactory.java:694) >> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF >> actory.java:355) >> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >> ream.java:673) >> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >> putStream.java:882) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) >> at java.io.FilterInputStream.read(Unknown Source) >> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >> HadoopDataInputStream.java:59) >> at java.util.zip.CheckedInputStream.read(Unknown Source) >> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >> Stream(FileInputFormat.java:717) >> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >> tFormat.java:689) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:424) >> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:47) >> at org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:140) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Unknown Source) >> 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.DFSClient >> - Failed to connect to /x.x.x.x:50010 for block, add to >> deadNodes and continue. java.net.SocketException: No buffer space available >> (maximum connections reached?): connect >> java.net.SocketException: No buffer space available (maximum connections >> reached?): connect >> at sun.nio.ch.Net.connect0(Native Method) >> at sun.nio.ch.Net.connect(Unknown Source) >> at sun.nio.ch.Net.connect(Unknown Source) >> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) >> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi >> thTimeout.java:192) >> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) >> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) >> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR >> eaderFactory.java:777) >> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead >> erFromTcp(BlockReaderFactory.java:694) >> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF >> actory.java:355) >> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >> ream.java:673) >> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >> putStream.java:882) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) >> at java.io.FilterInputStream.read(Unknown Source) >> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >> HadoopDataInputStream.java:59) >> at java.util.zip.CheckedInputStream.read(Unknown Source) >> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >> at org.apache.flink.api.common.io.compression.GzipInflaterInput >> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >> Stream(FileInputFormat.java:717) >> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >> tFormat.java:689) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:424) >> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) >> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >> imitedInputFormat.java:47) >> at org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:140) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Unknown Source) >> >> I inspected the open connections, and found that a very large number of >> connections are opened by the job and stuck on the CLOSE_WAIT status, which >> I guess exhausted the ephemeral port space after some time. >> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and >> using a prallelism of 8. I got the same exception even with a job >> paralellism set to 1. The same exception happened after upgrading to Flink >> 1.1.3 too. >> >> Any idea what could be the root cause of the problem and how to solve it? >> Thank you. >> >> Best, >> Yassine >> > >