[ https://issues.apache.org/jira/browse/FLINK-11187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16736889#comment-16736889 ]
Kostas Kloudas commented on FLINK-11187: ---------------------------------------- Thanks for checking this [~sthm] ! It would be really helpful if you could also provide a minimal example that we can use for a test. And of course, let us know if these changes solve the problem or not. > StreamingFileSink with S3 backend transient socket timeout issues > ------------------------------------------------------------------ > > Key: FLINK-11187 > URL: https://issues.apache.org/jira/browse/FLINK-11187 > Project: Flink > Issue Type: Bug > Components: FileSystem, Streaming Connectors > Affects Versions: 1.7.0, 1.7.1 > Reporter: Addison Higham > Assignee: Addison Higham > Priority: Major > Fix For: 1.7.2, 1.8.0 > > > When using the StreamingFileSink with S3A backend, occasionally, errors like > this will occur: > {noformat} > Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > Your socket connection to the server was not read from or written to within > the timeout period. Idle connections will be closed. (Service: Amazon S3; > Status Code: 400; Error Code: RequestTimeout; Request ID: xxx; S3 Extended > Request ID: xxx, S3 Extended Request ID: xxx > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056){noformat} > This causes a restart of flink job, which is often able to recover from, but > under heavy load, this can become very frequent. > > Turning on debug logs you can find the following relevant stack trace: > {noformat} > 2018-12-17 05:55:46,546 DEBUG > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient - FYI: > failed to reset content inputstream before throwing up > java.io.IOException: Resetting to invalid mark > at java.io.BufferedInputStream.reset(BufferedInputStream.java:448) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkBufferedInputStream.reset(SdkBufferedInputStream.java:106) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.event.ProgressInputStream.reset(ProgressInputStream.java:168) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.SdkFilterInputStream.reset(SdkFilterInputStream.java:112) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.lastReset(AmazonHttpClient.java:1145) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1070) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) > at > org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:74) > at > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:319) > at > org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){noformat} > This error occurs because of a transient failure in writing a multipart chunk > fails and the underlying InputStream cannot be reset. This ResetException > should be thrown to the client (as documented here: > [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html]) > but for some reason is not, instead, the client is retrying the request, but > now with a fully consumed InputStream. Because this InputStream is > empty/smaller we can't fill up the Content-Length that the multipart upload > is expecting, so the socket hangs to eventually be timed out. > > This failure happens roughly ~20 times before the AWS client retry logic > finally fails the request and the socket time out exception is thrown. > > As mentioned in the best practice AWS doc, the best fix for this is to use a > File or FileInputStream object or to use the setReadLimit. I tried to use a > global SDK property (com.amazonaws.sdk.s3.defaultStreamBufferSize) to set > this value, but that did not fix the problem, which I believe is because the > InputStream is not mark-able and the AWS client doesn't wrap the stream. > > What is confirmed to work is the following patch: > [https://gist.github.com/addisonj/00fc28f1f8f189380d8e53fdc887fae6] > > That is obviously not ideal, but it may suffice to just make that > configurable. > > The other option is to instead expose the S3A WriteHelper option to pass a > file to the S3AccessHelper and change the other relevant classes > (RefCountedFSOutputStream) to expose the File object and directly hand that > to the S3A WriteHelper -- This message was sent by Atlassian JIRA (v7.6.3#76005)