Happy to hear it! On Mon, Oct 17, 2016 at 9:31 AM, Yassine MARZOUGUI < y.marzou...@mindlytix.com> wrote:
> That solved my problem, Thank you! > > Best, > Yassine > > 2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> Hi! >> >> Looks to me that this is the following problem: The Decompression Streams >> did not properly forward the "close()" calls. >> >> It is in the lastest 1.2-SNAPSHOT, but did not make it into version >> 1.1.3. >> The fix is in that pull request: https://github.com/apache/flin >> k/pull/2581 >> >> I have pushed the fix into the latest 1.1-SNAPSHOT branch. >> >> If you get the code via "git clone -b release-1.1 >> https://github.com/apache/flink.git" you will get the code that is the >> same as the 1.1.3 release, plus the patch to this problem. >> >> Greetings, >> Stephan >> >> >> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI < >> y.marzou...@mindlytix.com> wrote: >> >>> Hi all, >>> >>> I'm reading a large number of small files from HDFS in batch mode (about >>> 20 directories, each directory contains about 3000 files, using >>> recursive.file.enumeration=true), and each time, at about 200 GB of >>> received data, my job fails with the following exception: >>> >>> java.io.IOException: Error opening the Input Split >>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block: >>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 >>> file=/filepath/filename.csv.gz >>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >>> tFormat.java:693) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:424) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:47) >>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>> aSourceTask.java:140) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Unknown Source) >>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not >>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 >>> file=/filepath/filename.csv.gz >>> at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu >>> tStream.java:984) >>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >>> ream.java:642) >>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >>> putStream.java:882) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja >>> va:934) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja >>> va:735) >>> at java.io.FilterInputStream.read(Unknown Source) >>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >>> HadoopDataInputStream.java:59) >>> at java.util.zip.CheckedInputStream.read(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >>> Stream(FileInputFormat.java:717) >>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >>> tFormat.java:689) >>> ... 5 more >>> >>> I checked the file each time and it exists and is healthy. Looking at >>> the taskmanager logs, I found the following exceptions which suggests it is >>> running out of connections: >>> >>> 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.BlockReaderFactory >>> - I/O error constructing remote block reader. >>> java.net.SocketException: No buffer space available (maximum connections >>> reached?): connect >>> at sun.nio.ch.Net.connect0(Native Method) >>> at sun.nio.ch.Net.connect(Unknown Source) >>> at sun.nio.ch.Net.connect(Unknown Source) >>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) >>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi >>> thTimeout.java:192) >>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) >>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient. >>> java:3436) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR >>> eaderFactory.java:777) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead >>> erFromTcp(BlockReaderFactory.java:694) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF >>> actory.java:355) >>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >>> ream.java:673) >>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >>> putStream.java:882) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) >>> at java.io.FilterInputStream.read(Unknown Source) >>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >>> HadoopDataInputStream.java:59) >>> at java.util.zip.CheckedInputStream.read(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >>> Stream(FileInputFormat.java:717) >>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >>> tFormat.java:689) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:424) >>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma >>> t.java:99) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:47) >>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>> aSourceTask.java:140) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Unknown Source) >>> 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.DFSClient >>> - Failed to connect to /x.x.x.x:50010 for block, add >>> to deadNodes and continue. java.net.SocketException: No buffer space >>> available (maximum connections reached?): connect >>> java.net.SocketException: No buffer space available (maximum connections >>> reached?): connect >>> at sun.nio.ch.Net.connect0(Native Method) >>> at sun.nio.ch.Net.connect(Unknown Source) >>> at sun.nio.ch.Net.connect(Unknown Source) >>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) >>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi >>> thTimeout.java:192) >>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) >>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient. >>> java:3436) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR >>> eaderFactory.java:777) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead >>> erFromTcp(BlockReaderFactory.java:694) >>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF >>> actory.java:355) >>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt >>> ream.java:673) >>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn >>> putStream.java:882) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) >>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) >>> at java.io.FilterInputStream.read(Unknown Source) >>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read( >>> HadoopDataInputStream.java:59) >>> at java.util.zip.CheckedInputStream.read(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source) >>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source) >>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at java.util.zip.GZIPInputStream.<init>(Unknown Source) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44) >>> at org.apache.flink.api.common.io.compression.GzipInflaterInput >>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31) >>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput >>> Stream(FileInputFormat.java:717) >>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu >>> tFormat.java:689) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:424) >>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma >>> t.java:99) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del >>> imitedInputFormat.java:47) >>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>> aSourceTask.java:140) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Unknown Source) >>> >>> I inspected the open connections, and found that a very large number of >>> connections are opened by the job and stuck on the CLOSE_WAIT status, which >>> I guess exhausted the ephemeral port space after some time. >>> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and >>> using a prallelism of 8. I got the same exception even with a job >>> paralellism set to 1. The same exception happened after upgrading to Flink >>> 1.1.3 too. >>> >>> Any idea what could be the root cause of the problem and how to solve it? >>> Thank you. >>> >>> Best, >>> Yassine >>> >> >> >