It is very important to point out that the Bucketing sink can currently NOT work properly on S3. It assumes a consistent file system (like listing / renaming works consistently), and S3 is only eventually consistent. I assume that this eventual consistency of S3 is the cause of your error.
There is a pull request for a bucketing sink on eventually consistent FS: https://github.com/apache/flink/pull/3752 Hope we can merge this once we are done with the 1.3.2 release. (cc-ing Gordon and Aljoscha, FYI) On Wed, Aug 2, 2017 at 10:56 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Aneesha, > > the logs would show that Flink is going through a recovery cycle. Recovery > means to cancel running tasks and start them again. > If you don't see something like that in the logs, Flink continues to > processing. > > I'm not familiar with the details of S3, so I can't tell if the exception > indicates data loss. > > Best, Fabian > > 2017-08-01 20:38 GMT+02:00 Aneesha Kaushal <aneesha.kaus...@reflektion.com > >: > >> Hello, >> >> I am using flink 1.2 and writing records to S3 using rolling sink. >> >> I am encountering this S3 write error quite frequently : >> >> TimerException{com.amazonaws.services.s3.model.AmazonS3Exception: Status >> Code: 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS >> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID: >> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr} >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: >> 404, AWS Service: Amazon S3, AWS Request ID: B573887B1850BF28, AWS Error >> Code: null, AWS Error Message: Not Found, S3 Extended Request ID: >> JKAW8uhr/kZixw2ltGUqYdp28ssVw3zXpO/hkNvOHATOvz8bYbEE2EVxkg/vgZdr >> at >> com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) >> at >> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) >> at >> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) >> at >> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) >> at >> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java:1088) >> at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:521) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:563) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:496) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:479) >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) >> ... 7 more >> >> >> I am unable to find the cause of this error. Also, I have the following >> questions regarding this error : >> >> 1) Do we loose any data or flink will go to last checkpoint and write >> again? >> 2) how can we prevent this error? >> >> Thanks, >> Aneesha >> >> >> >