Hi Generic Flink Developer, Normally when you get an internal error from AWS, you also get a 500 status code - the 200 seems odd to me.
One thing I do know is that if you’re hitting S3 hard, you have to expect and recover from errors. E.g. distcpy jobs in Hadoop-land will auto-retry a failed request, because Things Go Wrong in AWS-land. So it surprises me a bit that BucketingSink is using a raw S3AFileSystem. In absence of Hadoop 3.1 support for S3A retry policies <https://issues.apache.org/jira/browse/HADOOP-13786>, it seems like Flink would want to wrap the S3AFileSystem with something that would retry requests which get an internal error. But I haven’t walked that code, so maybe there is retry support somewhere else… — Ken > On Dec 9, 2018, at 1:37 PM, Flink Developer <developer...@protonmail.com> > wrote: > > Hi, is there any idea on what causes this and how it can be resolved? Thanks. > > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ > On Wednesday, December 5, 2018 12:44 AM, Flink Developer > <developer...@protonmail.com> wrote: > >> I have a Flink app with high parallelism (400) running in AWS EMR. It uses >> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using >> RocksDb backend for checkpointing). The destination is defined using >> "s3a://" prefix. The Flink job is a streaming app which runs continuously. >> At any given time, it's possible that each worker will write to a part file >> in S3. This means all workers combined could potentially generate/write to >> 400 files (due to 400 parallelism). >> >> After a few days, one of the workers will fail with the exception: >> >> org.apache.hadoop.fs.s3a.AWSS3IOException: >> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, >> bucket/2018-09-01/05/_file-10-1.gz.pending): >> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an >> internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 >> InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: >> yyyyyyyyyyyyyyy >> at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: >> 178) >> at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: >> 1803) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776) >> at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446) >> >> This seems to randomly occur when a new part file is created by the >> BucketingSink. The odd thing is that this happens randomly but consistently >> on separate job executions. When it occurs, it happens to 1 of the parallel >> flink workers (not all). Also, when this occurs, the Flink job transitions >> into a FAILING state, but the Flink job does not restart and resume/recover >> from the last successful checkpoint. >> >> What is the cause for this and how can it be resolved? Also, how can the job >> be configured to restart/recover from the last successful checkpoint instead >> of staying in the FAILING state? > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra