Thanks for the details. I got it working. I have around 1 directory for
each month and I am running for 12-15 month data.So I created a dataset
from each month and did a union.

However, when I run I get the HTTP timeout issue. I am reading more than
120K files in total in all of months.

I am using S3 and emr to do this and flink version is 1.4.2. When I run for
6 months this works fine.

Below is part of error

Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz
[0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
pool
    at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:705)
    at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:477)
    at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:48)
    at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:22)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.perform(GetObjectCall.java:9)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:80)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at
com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObject(AmazonS3LiteClient.java:99)
    at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:452)
    at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrievePair(Jets3tNativeFileSystemStore.java:439)
    at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
    at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
    at
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1213)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at
com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
    at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
    at
org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:865)
Caused by:
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
Timeout waiting for connection from pool
    at
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown
Source)

Thanks

On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Flink InputFormats generate their InputSplits sequentially on the
> JobManager.
> These splits are stored in the heap of the JM process and handed out to
> SourceTasks when they request them lazily.
> Split assignment is done by a InputSplitAssigner, that can be customized.
> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
> assign splits based on locality.
>
> I see three potential problems:
> 1) InputSplit generation might take a long while. The JM is blocked until
> splits are generated.
> 2) All InputSplits need to be stored on the JM heap. You might need to
> assign more memory to the JM process.
> 3) Split assignment might take a while depending on the complexity of the
> InputSplitAssigner. You can implement a custom assigner to make this more
> efficient (from an assignment point of view).
>
> Best, Fabian
>
> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jornfra...@gmail.com>:
>
>> It causes more overhead (processes etc) which might make it slower.
>> Furthermore if you have them stored on HDFS then the bottleneck is the
>> namenode which will have to answer millions of requests.
>> The latter point will change in future Hadoop versions with
>> http://ozone.hadoop.apache.org/
>>
>> On 13. Aug 2018, at 21:01, Darshan Singh <darshan.m...@gmail.com> wrote:
>>
>> Hi Guys,
>>
>> Is there a limit on number of files flink dataset can read? My question
>> is will there be any sort of issues if I have say millions of files to read
>> to create single dataset.
>>
>> Thanks
>>
>>
>

Reply via email to