It is very important to point out that the Bucketing sink can currently NOT
work properly on S3. It assumes a consistent file system (like listing /
renaming works consistently), and S3 is only eventually consistent. I
assume that this eventual consistency of S3 is the cause of your error.

There is a pull request for a bucketing sink on eventually consistent FS:
https://github.com/apache/flink/pull/3752
Hope we can merge this once we are done with the 1.3.2 release.

(cc-ing Gordon and Aljoscha, FYI)

On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Aneesha,
>
> the logs would show that Flink is going through a recovery cycle. Recovery
> means to cancel running tasks and start them again.
> If you don't see something like that in the logs, Flink continues to
> processing.
>
> I'm not familiar with the details of S3, so I can't tell if the exception
> indicates data loss.
>
> Best, Fabian
>
> 2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <aneesha.kaus...@reflektion.com
> >:
>
>> Hello,
>>
>> I am using flink 1.2 and writing records to S3 using rolling sink.
>>
>> I am encountering this S3 write error quite frequently :
>>
>> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status 
>> Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS 
>> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
>> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr}
>>      at 
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>      at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 
>> 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error 
>> Code: null, AWS Error Message: Not Found, S3 Extended Request ID: 
>> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr
>>      at 
>> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
>>      at 
>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
>>      at 
>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
>>      at 
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
>>      at 
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
>>      at 
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088)
>>      at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218)
>>      ... 7 more
>>
>>
>> I am unable to find the cause of this error. Also, I have the following
>> questions regarding this error :
>>
>> 1) Do we loose any data or flink will go to last checkpoint and write
>> again?
>> 2) how can we prevent this error?
>>
>> Thanks,
>> Aneesha
>>
>>
>>
>

Reply via email to