Thanks for the details. I got it working. I have around 1 directory for each month and I am running for 12-15 month data.So I created a dataset from each month and did a union.
However, when I run I get the HTTP timeout issue. I am reading more than 120K files in total in all of months. I am using S3 and emr to do this and flink version is 1.4.2. When I run for 6 months this works fine. Below is part of error Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from pool at org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477) at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439) at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155) at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346) at com.sun.proxy.$Proxy28.retrievePair(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36) at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865) Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263) at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown Source) Thanks On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > Flink InputFormats generate their InputSplits sequentially on the > JobManager. > These splits are stored in the heap of the JM process and handed out to > SourceTasks when they request them lazily. > Split assignment is done by a InputSplitAssigner, that can be customized. > FileInputFormats typically use a LocatableInputSplitAssigner which tries to > assign splits based on locality. > > I see three potential problems: > 1) InputSplit generation might take a long while. The JM is blocked until > splits are generated. > 2) All InputSplits need to be stored on the JM heap. You might need to > assign more memory to the JM process. > 3) Split assignment might take a while depending on the complexity of the > InputSplitAssigner. You can implement a custom assigner to make this more > efficient (from an assignment point of view). > > Best, Fabian > > 2018-08-14 8:19 GMT+02:00 Jörn Franke <jornfra...@gmail.com>: > >> It causes more overhead (processes etc) which might make it slower. >> Furthermore if you have them stored on HDFS then the bottleneck is the >> namenode which will have to answer millions of requests. >> The latter point will change in future Hadoop versions with >> http://ozone.hadoop.apache.org/ >> >> On 13. Aug 2018, at 21:01, Darshan Singh <darshan.m...@gmail.com> wrote: >> >> Hi Guys, >> >> Is there a limit on number of files flink dataset can read? My question >> is will there be any sort of issues if I have say millions of files to read >> to create single dataset. >> >> Thanks >> >> >