Issue opened here: https://issues.apache.org/jira/browse/FLINK-11187

On Mon, Dec 17, 2018 at 2:37 PM Addison Higham <addis...@gmail.com> wrote:

> Oh this is timely!
>
> I hope I can save you some pain Kostas! (cc-ing to flink dev to get
> feedback there for what I believe to be a confirmed bug)
>
>
> I was just about to open up a flink issue for this after digging (really)
> deep and figuring out the issue over the weekend.
>
> The problem arises due the flink hands input streams to the
> S3AccessHelper. If you turn on debug logs for s3, you will eventually see
> this stack trace:
>
> 2018-12-17 05:55:46,546 DEBUG
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient  -
> FYI: failed to reset content inputstream before throwing up
> java.io.IOException: Resetting to invalid mark
>   at java.io.BufferedInputStream.reset(BufferedInputStream.java:448)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306)
>   at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>   at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471)
>   at
> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74)
>   at
> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319)
>   at
> org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
> From this, you can see that for (some reason) AWS fails to write a
> multi-part chunk and then tries to reset the input stream in order to retry
> but fails (because the InputStream is not mark-able)
>
> That exception is swallowed (it seems like it should be raised up to
> client, but isn't for an unknown reason). The s3-client then tries to
> repeat the request using it's built in retry logic, however, because the
> InputStream is consumed
> and has no more bytes to write, we never fill up the expected
> content-length that the s3 put request is expecting. Eventually, after it
> hits the max number of retries, it fails and you get the error above.
>
> I just started running a fix for this (which is a hack not the real
> solution) here:
> https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6
>
> This whole thing is documented here:
> https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
>
> However, I found that just using the documented property didn't appear to
> work and I had to wrap the InputStream in the BufferedInputStream for it to
> work.
>
> I think the real fix is either to:
>
> 1. Use the BufferedInputStream but make it configurable
> 2. Refactor S3AccessHelper to have another signature that takes a File
> object and change the RefCountedFSOutputStream to also be able to give a
> reference the the underlying file.
>
> I can pretty easily do this work, but would be curious the direction that
> the maintainers would prefer.
>
> Thanks,
>
> Addison!
>
>
>
>
>
>
> On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Hi Steffen,
>>
>> Thanks for reporting this.
>>
>> Internally Flink does not keep any open connections to S3.  It only keeps
>> buffers data internally up
>> till the point they reach a min-size limit (by default 5MB) and then
>> uploads them as a part of
>> an MPU on one go. Given this, I will have to dig a bit dipper to see why
>> a connection would timeout.
>>
>> If you are willing to dig into the code, all interactions with S3 pass
>> through the S3AccessHelper
>> class and its implementation, the HadoopS3AccessHelper. For the buffering
>> and uploading logic,
>> you could have a look at the S3RecoverableWriter and the
>> S3RecoverableFsDataOutputStream.
>>
>> I will keep looking into it. In the meantime, if you find anything let us
>> know.
>>
>> Cheers,
>> Kostas
>>
>>

Reply via email to