Hi Darshan, This looks like a file system configuration issue to me. Flink supports different file systems for S3 and there are also a few tuning knobs.
Did you have a look at the docs for file system configuration [1]? Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/filesystems.html 2018-08-14 20:45 GMT+02:00 Darshan Singh <darshan.m...@gmail.com>: > Thanks for the details. I got it working. I have around 1 directory for > each month and I am running for 12-15 month data.So I created a dataset > from each month and did a union. > > However, when I run I get the HTTP timeout issue. I am reading more than > 120K files in total in all of months. > > I am using S3 and emr to do this and flink version is 1.4.2. When I run > for 6 months this works fine. > > Below is part of error > > Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz > [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from > pool > at org.apache.flink.api.common.io.FileInputFormat.open( > FileInputFormat.java:705) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:477) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:48) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:145) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: > Unable to execute HTTP request: Timeout waiting for connection from pool > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.handleRetryableException( > AmazonHttpClient.java:1114) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.executeWithTimer( > AmazonHttpClient.java:717) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient$RequestExecutionBuilderImpl. > execute(AmazonHttpClient.java:649) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http. > AmazonHttpClient.execute(AmazonHttpClient.java:513) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services. > s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services. > s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services. > s3.AmazonS3Client.getObject(AmazonS3Client.java:1409) > at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall. > perform(GetObjectCall.java:22) > at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall. > perform(GetObjectCall.java:9) > at com.amazon.ws.emr.hadoop.fs.s3.lite.executor. > GlobalS3Executor.execute(GlobalS3Executor.java:80) > at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient. > invoke(AmazonS3LiteClient.java:176) > at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient. > getObject(AmazonS3LiteClient.java:99) > at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore. > retrievePair(Jets3tNativeFileSystemStore.java:452) > at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore. > retrievePair(Jets3tNativeFileSystemStore.java:439) > at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > RetryInvocationHandler.java:409) > at org.apache.hadoop.io.retry.RetryInvocationHandler$Call. > invokeMethod(RetryInvocationHandler.java:163) > at org.apache.hadoop.io.retry.RetryInvocationHandler$Call. > invoke(RetryInvocationHandler.java:155) > at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce( > RetryInvocationHandler.java:95) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > RetryInvocationHandler.java:346) > at com.sun.proxy.$Proxy28.retrievePair(Unknown Source) > at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open( > S3NativeFileSystem.java:1213) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790) > at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open( > EmrFileSystem.java:166) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open( > HadoopFileSystem.java:119) > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open( > HadoopFileSystem.java:36) > at org.apache.flink.api.common.io.FileInputFormat$ > InputSplitOpenThread.run(FileInputFormat.java:865) > Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn. > ConnectionPoolTimeoutException: Timeout waiting for connection from pool > at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn. > PoolingHttpClientConnectionManager.leaseConnection( > PoolingHttpClientConnectionManager.java:286) > at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn. > PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionMan > ager.java:263) > at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn. > ClientConnectionRequestFactory$Handler.invoke( > ClientConnectionRequestFactory.java:70) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown > Source) > > Thanks > > On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> Flink InputFormats generate their InputSplits sequentially on the >> JobManager. >> These splits are stored in the heap of the JM process and handed out to >> SourceTasks when they request them lazily. >> Split assignment is done by a InputSplitAssigner, that can be customized. >> FileInputFormats typically use a LocatableInputSplitAssigner which tries to >> assign splits based on locality. >> >> I see three potential problems: >> 1) InputSplit generation might take a long while. The JM is blocked until >> splits are generated. >> 2) All InputSplits need to be stored on the JM heap. You might need to >> assign more memory to the JM process. >> 3) Split assignment might take a while depending on the complexity of the >> InputSplitAssigner. You can implement a custom assigner to make this more >> efficient (from an assignment point of view). >> >> Best, Fabian >> >> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jornfra...@gmail.com>: >> >>> It causes more overhead (processes etc) which might make it slower. >>> Furthermore if you have them stored on HDFS then the bottleneck is the >>> namenode which will have to answer millions of requests. >>> The latter point will change in future Hadoop versions with >>> http://ozone.hadoop.apache.org/ >>> >>> On 13. Aug 2018, at 21:01, Darshan Singh <darshan.m...@gmail.com> wrote: >>> >>> Hi Guys, >>> >>> Is there a limit on number of files flink dataset can read? My question >>> is will there be any sort of issues if I have say millions of files to read >>> to create single dataset. >>> >>> Thanks >>> >>> >> >