Hi Darshan,

This looks like a file system configuration issue to me.
Flink supports different file systems for S3 and there are also a few
tuning knobs.

Did you have a look at the docs for file system configuration [1]?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/filesystems.html

2018-08-14 20:45 GMT+02:00 Darshan Singh <darshan.m...@gmail.com>:

> Thanks for the details. I got it working. I have around 1 directory for
> each month and I am running for 12-15 month data.So I created a dataset
> from each month and did a union.
>
> However, when I run I get the HTTP timeout issue. I am reading more than
> 120K files in total in all of months.
>
> I am using S3 and emr to do this and flink version is 1.4.2. When I run
> for 6 months this works fine.
>
> Below is part of error
>
> Caused by: java.io.IOException: Error opening the Input Split s3://XXXX.gz
> [0,-1]: Unable to execute HTTP request: Timeout waiting for connection from
> pool
>     at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:705)
>     at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:477)
>     at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:48)
>     at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:145)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.handleRetryableException(
> AmazonHttpClient.java:1114)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.executeWithTimer(
> AmazonHttpClient.java:717)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient$RequestExecutionBuilderImpl.
> execute(AmazonHttpClient.java:649)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.
> AmazonHttpClient.execute(AmazonHttpClient.java:513)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.
> s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:22)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectCall.
> perform(GetObjectCall.java:9)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.
> GlobalS3Executor.execute(GlobalS3Executor.java:80)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> invoke(AmazonS3LiteClient.java:176)
>     at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.
> getObject(AmazonS3LiteClient.java:99)
>     at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:452)
>     at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.
> retrievePair(Jets3tNativeFileSystemStore.java:439)
>     at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:409)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invokeMethod(RetryInvocationHandler.java:163)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.
> invoke(RetryInvocationHandler.java:155)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(
> RetryInvocationHandler.java:95)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:346)
>     at com.sun.proxy.$Proxy28.retrievePair(Unknown Source)
>     at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(
> S3NativeFileSystem.java:1213)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
>     at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(
> EmrFileSystem.java:166)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(
> HadoopFileSystem.java:119)
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(
> HadoopFileSystem.java:36)
>     at org.apache.flink.api.common.io.FileInputFormat$
> InputSplitOpenThread.run(FileInputFormat.java:865)
> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.
> ConnectionPoolTimeoutException: Timeout waiting for connection from pool
>     at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingHttpClientConnectionManager.leaseConnection(
> PoolingHttpClientConnectionManager.java:286)
>     at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.
> PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionMan
> ager.java:263)
>     at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.
> ClientConnectionRequestFactory$Handler.invoke(
> ClientConnectionRequestFactory.java:70)
>     at 
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy30.get(Unknown
> Source)
>
> Thanks
>
> On Tue, Aug 14, 2018 at 9:46 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink InputFormats generate their InputSplits sequentially on the
>> JobManager.
>> These splits are stored in the heap of the JM process and handed out to
>> SourceTasks when they request them lazily.
>> Split assignment is done by a InputSplitAssigner, that can be customized.
>> FileInputFormats typically use a LocatableInputSplitAssigner which tries to
>> assign splits based on locality.
>>
>> I see three potential problems:
>> 1) InputSplit generation might take a long while. The JM is blocked until
>> splits are generated.
>> 2) All InputSplits need to be stored on the JM heap. You might need to
>> assign more memory to the JM process.
>> 3) Split assignment might take a while depending on the complexity of the
>> InputSplitAssigner. You can implement a custom assigner to make this more
>> efficient (from an assignment point of view).
>>
>> Best, Fabian
>>
>> 2018-08-14 8:19 GMT+02:00 Jörn Franke <jornfra...@gmail.com>:
>>
>>> It causes more overhead (processes etc) which might make it slower.
>>> Furthermore if you have them stored on HDFS then the bottleneck is the
>>> namenode which will have to answer millions of requests.
>>> The latter point will change in future Hadoop versions with
>>> http://ozone.hadoop.apache.org/
>>>
>>> On 13. Aug 2018, at 21:01, Darshan Singh <darshan.m...@gmail.com> wrote:
>>>
>>> Hi Guys,
>>>
>>> Is there a limit on number of files flink dataset can read? My question
>>> is will there be any sort of issues if I have say millions of files to read
>>> to create single dataset.
>>>
>>> Thanks
>>>
>>>
>>
>

Reply via email to