Hi Generic Flink Developer,

Normally when you get an internal error from AWS, you also get a 500 status 
code - the 200 seems odd to me.

One thing I do know is that if you’re hitting S3 hard, you have to expect and 
recover from errors.

E.g. distcpy jobs in Hadoop-land will auto-retry a failed request, because 
Things Go Wrong in AWS-land.

So it surprises me a bit that BucketingSink is using a raw S3AFileSystem. In 
absence of Hadoop 3.1 support for S3A retry policies 
<https://issues.apache.org/jira/browse/HADOOP-13786>, it seems like Flink would 
want to wrap the S3AFileSystem with something that would retry requests which 
get an internal error.

But I haven’t walked that code, so maybe there is retry support somewhere else…

— Ken



> On Dec 9, 2018, at 1:37 PM, Flink Developer <developer...@protonmail.com> 
> wrote:
> 
> Hi, is there any idea on what causes this and how it can be resolved? Thanks.
> 
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Wednesday, December 5, 2018 12:44 AM, Flink Developer 
> <developer...@protonmail.com> wrote:
> 
>> I have a Flink app with high parallelism (400) running in AWS EMR. It uses 
>> Flink v1.5.2. It sources Kafka and sinks to S3 using BucketingSink (using 
>> RocksDb backend for checkpointing). The destination is defined using 
>> "s3a://" prefix. The Flink job is a streaming app which runs continuously. 
>> At any given time, it's possible that each worker will write to a part file 
>> in S3. This means all workers combined could potentially generate/write to 
>> 400 files (due to 400 parallelism). 
>> 
>> After a few days, one of the workers will fail with the exception:
>> 
>>     org.apache.hadoop.fs.s3a.AWSS3IOException: 
>> copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, 
>> bucket/2018-09-01/05/_file-10-1.gz.pending): 
>> com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an 
>> internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 
>> InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: 
>> yyyyyyyyyyyyyyy
>>     at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 
>> 178)
>>     at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 
>> 1803)
>>     at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
>>     at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
>>     at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
>> 
>> This seems to randomly occur when a new part file is created by the 
>> BucketingSink. The odd thing is that this happens randomly but consistently 
>> on separate job executions. When it occurs, it happens to 1 of the parallel 
>> flink workers (not all). Also, when this occurs, the Flink job transitions 
>> into a FAILING state, but the Flink job does not restart and resume/recover 
>> from the last successful checkpoint.  
>> 
>> What is the cause for this and how can it be resolved? Also, how can the job 
>> be configured to restart/recover from the last successful checkpoint instead 
>> of staying in the FAILING state?
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to