Issue opened here: https://issues.apache.org/jira/browse/FLINK-11187
On Mon, Dec 17, 2018 at 2:37 PM Addison Higham <addis...@gmail.com> wrote: > Oh this is timely! > > I hope I can save you some pain Kostas! (cc-ing to flink dev to get > feedback there for what I believe to be a confirmed bug) > > > I was just about to open up a flink issue for this after digging (really) > deep and figuring out the issue over the weekend. > > The problem arises due the flink hands input streams to the > S3AccessHelper. If you turn on debug logs for s3, you will eventually see > this stack trace: > > 2018-12-17 05:55:46,546 DEBUG > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - > FYI: failed to reset content inputstream before throwing up > java.io.IOException: Resetting to invalid mark > at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) > at > org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74) > at > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319) > at > org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > From this, you can see that for (some reason) AWS fails to write a > multi-part chunk and then tries to reset the input stream in order to retry > but fails (because the InputStream is not mark-able) > > That exception is swallowed (it seems like it should be raised up to > client, but isn't for an unknown reason). The s3-client then tries to > repeat the request using it's built in retry logic, however, because the > InputStream is consumed > and has no more bytes to write, we never fill up the expected > content-length that the s3 put request is expecting. Eventually, after it > hits the max number of retries, it fails and you get the error above. > > I just started running a fix for this (which is a hack not the real > solution) here: > https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6 > > This whole thing is documented here: > https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html > > However, I found that just using the documented property didn't appear to > work and I had to wrap the InputStream in the BufferedInputStream for it to > work. > > I think the real fix is either to: > > 1. Use the BufferedInputStream but make it configurable > 2. Refactor S3AccessHelper to have another signature that takes a File > object and change the RefCountedFSOutputStream to also be able to give a > reference the the underlying file. > > I can pretty easily do this work, but would be curious the direction that > the maintainers would prefer. > > Thanks, > > Addison! > > > > > > > On Fri, Dec 14, 2018 at 8:43 AM Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > >> Hi Steffen, >> >> Thanks for reporting this. >> >> Internally Flink does not keep any open connections to S3. It only keeps >> buffers data internally up >> till the point they reach a min-size limit (by default 5MB) and then >> uploads them as a part of >> an MPU on one go. Given this, I will have to dig a bit dipper to see why >> a connection would timeout. >> >> If you are willing to dig into the code, all interactions with S3 pass >> through the S3AccessHelper >> class and its implementation, the HadoopS3AccessHelper. For the buffering >> and uploading logic, >> you could have a look at the S3RecoverableWriter and the >> S3RecoverableFsDataOutputStream. >> >> I will keep looking into it. In the meantime, if you find anything let us >> know. >> >> Cheers, >> Kostas >> >>